Skip to content

Commit fa7c3b4

Browse files
Merge 5738349 into 2876ba4
2 parents 2876ba4 + 5738349 commit fa7c3b4

File tree

2 files changed

+17
-16
lines changed

2 files changed

+17
-16
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,28 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
1717
auto& index = Self->MutableIndexAs<NOlap::TColumnEngineForLogs>();
1818
const auto minReadSnapshot = Self->GetMinReadSnapshot();
1919

20-
std::optional<EOperationBehaviour> packBehaviour;
2120
for (auto&& writeResult : Pack.GetWriteResults()) {
2221
const auto& writeMeta = writeResult.GetWriteMeta();
2322
AFL_VERIFY(!writeMeta.HasLongTxId());
2423
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
25-
if (!packBehaviour) {
26-
packBehaviour = op->GetBehaviour();
24+
if (!PackBehaviour) {
25+
PackBehaviour = op->GetBehaviour();
2726
} else {
28-
AFL_VERIFY(packBehaviour == op->GetBehaviour());
27+
AFL_VERIFY(PackBehaviour == op->GetBehaviour());
2928
}
3029
}
31-
AFL_VERIFY(!!packBehaviour);
32-
std::vector<TInsertWriteId> insertWriteIds;
30+
AFL_VERIFY(!!PackBehaviour);
3331
auto& granule = index.MutableGranuleVerified(Pack.GetPathId());
3432
for (auto&& portion : Pack.MutablePortions()) {
35-
if (packBehaviour == EOperationBehaviour::NoTxWrite) {
33+
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
3634
static TAtomicCounter Counter = 0;
3735
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId((TInsertWriteId)Counter.Inc());
3836
} else {
3937
portion.GetPortionInfoConstructor()->MutablePortionConstructor().SetInsertWriteId(Self->InsertTable->BuildNextWriteId(txc));
4038
}
41-
insertWriteIds.emplace_back(portion.GetPortionInfoConstructor()->GetPortionConstructor().GetInsertWriteIdVerified());
39+
InsertWriteIds.emplace_back(portion.GetPortionInfoConstructor()->GetPortionConstructor().GetInsertWriteIdVerified());
4240
portion.Finalize(Self, txc);
43-
if (packBehaviour == EOperationBehaviour::NoTxWrite) {
41+
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
4442
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
4543
} else {
4644
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo());
@@ -61,15 +59,11 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
6159
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
6260
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
6361
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
64-
if (operationIds.size() == 1) {
65-
operation->OnWriteFinish(txc, insertWriteIds, true);
66-
} else {
67-
operation->OnWriteFinish(txc, {}, true);
68-
}
62+
operation->OnWriteFinish(txc, {}, true);
6963
} else {
70-
operation->OnWriteFinish(txc, insertWriteIds, false);
64+
operation->OnWriteFinish(txc, InsertWriteIds, false);
7165
}
72-
Self->OperationsManager->LinkInsertWriteIdToOperationWriteId(insertWriteIds, operation->GetWriteId());
66+
Self->OperationsManager->LinkInsertWriteIdToOperationWriteId(InsertWriteIds, operation->GetWriteId());
7367
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
7468
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
7569
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
@@ -121,6 +115,11 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
121115
for (auto&& portion : Pack.GetPortions()) {
122116
granule.InsertPortionOnComplete(portion.GetPortionInfo(), index);
123117
}
118+
if (PackBehaviour == EOperationBehaviour::NoTxWrite) {
119+
for (auto&& i : InsertWriteIds) {
120+
granule.CommitPortionOnComplete(i, index);
121+
}
122+
}
124123
for (auto&& writeResult : Pack.GetWriteResults()) {
125124
const auto& writeMeta = writeResult.GetWriteMeta();
126125
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ class TTxBlobsWritingFinished: public TExtendedTransactionBase {
3737
}
3838
};
3939

40+
std::vector<TInsertWriteId> InsertWriteIds;
4041
std::vector<TReplyInfo> Results;
42+
std::optional<EOperationBehaviour> PackBehaviour;
4143

4244
public:
4345
TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus,

0 commit comments

Comments
 (0)