11#include " tx_write.h"
22
33namespace NKikimr ::NColumnShard {
4- bool TTxWrite::InsertOneBlob (TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob) {
5- const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta ();
6-
7- const auto & blobRange = blobData.GetBlobRange ();
4+ bool TTxWrite::InsertOneBlob (TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
5+ NKikimrTxColumnShard::TLogicalMetadata meta;
6+ meta.SetNumRows (batch->GetRowsCount ());
7+ meta.SetRawBytes (batch->GetRawBytes ());
8+ meta.SetDirtyWriteTimeSeconds (batch.GetStartInstant ().Seconds ());
9+ meta.SetSpecialKeysRawData (batch->GetSpecialKeysSafe ().SerializeToString ());
10+
11+ const auto & blobRange = batch.GetRange ();
812 Y_ABORT_UNLESS (blobRange.GetBlobId ().IsValid ());
913
1014 // First write wins
1115 TBlobGroupSelector dsGroupSelector (Self->Info ());
1216 NOlap::TDbWrapper dbTable (txc.DB , &dsGroupSelector);
1317
14- const auto & writeMeta (PutBlobResult-> Get ()->GetWriteMeta () );
15-
16- auto tableSchema = Self->TablesManager .GetPrimaryIndex ()->GetVersionedIndex ().GetSchemaVerified (PutBlobResult-> Get ()-> GetSchemaVersion () );
18+ const auto & writeMeta = batch. GetAggregation (). GetWriteData ()->GetWriteMeta ();
19+ auto schemeVersion = batch. GetAggregation (). GetWriteData ()-> GetData ()-> GetSchemaVersion ();
20+ auto tableSchema = Self->TablesManager .GetPrimaryIndex ()->GetVersionedIndex ().GetSchemaVerified (schemeVersion );
1721
18- NOlap::TInsertedData insertData ((ui64)writeId, writeMeta.GetTableId (), writeMeta.GetDedupId (), blobRange, meta, tableSchema->GetVersion (), blob );
22+ NOlap::TInsertedData insertData ((ui64)writeId, writeMeta.GetTableId (), writeMeta.GetDedupId (), blobRange, meta, tableSchema->GetVersion (), batch-> GetData () );
1923 bool ok = Self->InsertTable ->Insert (dbTable, std::move (insertData));
2024 if (ok) {
21- // Put new data into blob cache
22- Y_ABORT_UNLESS (blobRange.IsFullBlob ());
23-
2425 Self->UpdateInsertTableCounters ();
2526 return true ;
2627 }
@@ -31,62 +32,89 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
3132bool TTxWrite::Execute (TTransactionContext& txc, const TActorContext&) {
3233 NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD)(" tablet_id" , Self->TabletID ())(" tx_state" , " execute" );
3334 ACFL_DEBUG (" event" , " start_execute" );
34- const auto & writeMeta (PutBlobResult->Get ()->GetWriteMeta ());
35- Y_ABORT_UNLESS (Self->TablesManager .IsReadyForWrite (writeMeta.GetTableId ()));
36-
37- txc.DB .NoMoreReadsForTx ();
38- TWriteOperation::TPtr operation;
39- if (writeMeta.HasLongTxId ()) {
40- AFL_VERIFY (PutBlobResult->Get ()->GetBlobData ().size () == 1 )(" count" , PutBlobResult->Get ()->GetBlobData ().size ());
41- } else {
42- operation = Self->OperationsManager ->GetOperation ((TWriteId)writeMeta.GetWriteId ());
43- Y_ABORT_UNLESS (operation);
44- Y_ABORT_UNLESS (operation->GetStatus () == EOperationStatus::Started);
45- }
35+ const NOlap::TWritingBuffer& buffer = PutBlobResult->Get ()->MutableWritesBuffer ();
36+ for (auto && aggr : buffer.GetAggregations ()) {
37+ const auto & writeMeta = aggr->GetWriteData ()->GetWriteMeta ();
38+ Y_ABORT_UNLESS (Self->TablesManager .IsReadyForWrite (writeMeta.GetTableId ()));
39+ txc.DB .NoMoreReadsForTx ();
40+ TWriteOperation::TPtr operation;
41+ if (writeMeta.HasLongTxId ()) {
42+ AFL_VERIFY (aggr->GetSplittedBlobs ().size () == 1 )(" count" , aggr->GetSplittedBlobs ().size ());
43+ } else {
44+ operation = Self->OperationsManager ->GetOperation ((TWriteId)writeMeta.GetWriteId ());
45+ Y_ABORT_UNLESS (operation);
46+ Y_ABORT_UNLESS (operation->GetStatus () == EOperationStatus::Started);
47+ }
4648
47- TVector<TWriteId> writeIds;
48- for (auto blobData : PutBlobResult->Get ()->GetBlobData ()) {
4949 auto writeId = TWriteId (writeMeta.GetWriteId ());
50- if (operation) {
51- writeId = Self->BuildNextWriteId (txc);
52- } else {
50+ if (!operation) {
5351 NIceDb::TNiceDb db (txc.DB );
5452 writeId = Self->GetLongTxWrite (db, writeMeta.GetLongTxIdUnsafe (), writeMeta.GetWritePartId ());
53+ aggr->AddWriteId (writeId);
5554 }
5655
57- if (!InsertOneBlob (txc, blobData, writeId, PutBlobResult->Get ()->GetBlobVerified (blobData.GetBlobRange ()))) {
58- LOG_S_DEBUG (TxPrefix () << " duplicate writeId " << (ui64)writeId << TxSuffix ());
59- Self->IncCounter (COUNTER_WRITE_DUPLICATE);
56+ for (auto && i : aggr->GetSplittedBlobs ()) {
57+ if (operation) {
58+ writeId = Self->BuildNextWriteId (txc);
59+ aggr->AddWriteId (writeId);
60+ }
61+
62+ if (!InsertOneBlob (txc, i, writeId)) {
63+ LOG_S_DEBUG (TxPrefix () << " duplicate writeId " << (ui64)writeId << TxSuffix ());
64+ Self->IncCounter (COUNTER_WRITE_DUPLICATE);
65+ }
6066 }
61- writeIds.push_back (writeId);
6267 }
6368
6469 TBlobManagerDb blobManagerDb (txc.DB );
65- AFL_VERIFY (PutBlobResult->Get ()->GetActions ().size () == 1 );
66- AFL_VERIFY (PutBlobResult->Get ()->GetActions ().front ()->GetBlobsCount () == PutBlobResult->Get ()->GetBlobData ().size ());
67- for (auto && i : PutBlobResult->Get ()->GetActions ()) {
70+ AFL_VERIFY (buffer.GetAddActions ().size () == 1 );
71+ for (auto && i : buffer.GetAddActions ()) {
6872 i->OnExecuteTxAfterWrite (*Self, blobManagerDb, true );
6973 }
70-
71- if (operation) {
72- operation->OnWriteFinish (txc, writeIds);
73- auto txInfo = Self->ProgressTxController ->RegisterTxWithDeadline (operation->GetTxId (), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, " " , writeMeta.GetSource (), 0 , txc);
74- Y_UNUSED (txInfo);
75- NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController ->GetCoordinatorInfo (operation->GetTxId ());
76- Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared (Self->TabletID (), operation->GetTxId (), tInfo);
77- } else {
78- Y_ABORT_UNLESS (writeIds.size () == 1 );
79- Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID (), writeMeta, (ui64)writeIds.front (), NKikimrTxColumnShard::EResultStatus::SUCCESS);
74+ for (auto && i : buffer.GetRemoveActions ()) {
75+ i->OnExecuteTxAfterRemoving (*Self, blobManagerDb, true );
76+ }
77+ for (auto && aggr : buffer.GetAggregations ()) {
78+ const auto & writeMeta = aggr->GetWriteData ()->GetWriteMeta ();
79+ std::unique_ptr<TEvColumnShard::TEvWriteResult> result;
80+ TWriteOperation::TPtr operation;
81+ if (!writeMeta.HasLongTxId ()) {
82+ operation = Self->OperationsManager ->GetOperation ((TWriteId)writeMeta.GetWriteId ());
83+ Y_ABORT_UNLESS (operation);
84+ Y_ABORT_UNLESS (operation->GetStatus () == EOperationStatus::Started);
85+ }
86+ if (operation) {
87+ operation->OnWriteFinish (txc, aggr->GetWriteIds ());
88+ auto txInfo = Self->ProgressTxController ->RegisterTxWithDeadline (operation->GetTxId (), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, " " , writeMeta.GetSource (), 0 , txc);
89+ Y_UNUSED (txInfo);
90+ NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController ->GetCoordinatorInfo (operation->GetTxId ());
91+ Results.emplace_back (NEvents::TDataEvents::TEvWriteResult::BuildPrepared (Self->TabletID (), operation->GetTxId (), tInfo));
92+ } else {
93+ Y_ABORT_UNLESS (aggr->GetWriteIds ().size () == 1 );
94+ Results.emplace_back (std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID (), writeMeta, (ui64)aggr->GetWriteIds ().front (), NKikimrTxColumnShard::EResultStatus::SUCCESS));
95+ }
8096 }
8197 return true ;
8298}
8399
84100void TTxWrite::Complete (const TActorContext& ctx) {
85101 NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD)(" tablet_id" , Self->TabletID ())(" tx_state" , " complete" );
86- AFL_VERIFY (Result);
87- Self->CSCounters .OnWriteTxComplete ((TMonotonic::Now () - PutBlobResult->Get ()->GetWriteMeta ().GetWriteStartInstant ()).MilliSeconds ());
88- Self->CSCounters .OnSuccessWriteResponse ();
89- ctx.Send (PutBlobResult->Get ()->GetWriteMeta ().GetSource (), Result.release ());
102+ const auto now = TMonotonic::Now ();
103+ const NOlap::TWritingBuffer& buffer = PutBlobResult->Get ()->MutableWritesBuffer ();
104+ for (auto && i : buffer.GetAddActions ()) {
105+ i->OnCompleteTxAfterWrite (*Self);
106+ }
107+ for (auto && i : buffer.GetRemoveActions ()) {
108+ i->OnCompleteTxAfterRemoving (*Self);
109+ }
110+ AFL_VERIFY (buffer.GetAggregations ().size () == Results.size ());
111+ for (ui32 i = 0 ; i < buffer.GetAggregations ().size (); ++i) {
112+ const auto & writeMeta = buffer.GetAggregations ()[i]->GetWriteData ()->GetWriteMeta ();
113+ ctx.Send (writeMeta.GetSource (), Results[i].release ());
114+ Self->CSCounters .OnWriteTxComplete ((now - writeMeta.GetWriteStartInstant ()).MilliSeconds ());
115+ Self->CSCounters .OnSuccessWriteResponse ();
116+ }
117+
90118}
91119
92120}
0 commit comments