Skip to content

Commit 6b61d54

Browse files
Merge de04db2 into c713cd6
2 parents c713cd6 + de04db2 commit 6b61d54

18 files changed

+153
-106
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
2222
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
2323
AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot));
2424
LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix());
25-
NOlap::TWriteIndexContext context(txc, dbWrap);
26-
changes->WriteIndexOnExecute(*Self, context);
25+
NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
26+
changes->WriteIndexOnExecute(Self, context);
2727

28-
changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, *context.BlobManagerDb, true);
28+
NOlap::TBlobManagerDb blobManagerDb(txc.DB);
29+
changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, blobManagerDb, true);
2930

3031
Self->UpdateIndexCounters();
3132
} else {
@@ -57,7 +58,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
5758

5859
if (!Ev->Get()->IndexChanges->IsAborted()) {
5960
NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
60-
Ev->Get()->IndexChanges->WriteIndexOnComplete(*Self, context);
61+
Ev->Get()->IndexChanges->WriteIndexOnComplete(Self, context);
6162
}
6263

6364
if (Ev->Get()->GetPutStatus() == NKikimrProto::TRYLATER) {

ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp

+10-8
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& con
3535
return result;
3636
}
3737

38-
void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
38+
void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
3939
Y_ABORT_UNLESS(Stage != EStage::Aborted);
4040
Y_ABORT_UNLESS(Stage <= EStage::Written);
4141
Y_ABORT_UNLESS(Stage >= EStage::Compiled);
@@ -44,13 +44,15 @@ void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard& self,
4444
Stage = EStage::Written;
4545
}
4646

47-
void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
48-
Y_ABORT_UNLESS(Stage == EStage::Written);
47+
void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
48+
Y_ABORT_UNLESS(Stage == EStage::Written || !self);
4949
Stage = EStage::Finished;
5050
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
5151
DoWriteIndexOnComplete(self, context);
52-
OnFinish(self, context);
53-
self.IncCounter(GetCounterIndex(context.FinishedSuccessfully));
52+
if (self) {
53+
OnFinish(*self, context);
54+
self->IncCounter(GetCounterIndex(context.FinishedSuccessfully));
55+
}
5456

5557
}
5658

@@ -105,10 +107,10 @@ void TColumnEngineChanges::OnFinish(NColumnShard::TColumnShard& self, TChangesFi
105107
DoOnFinish(self, context);
106108
}
107109

108-
TWriteIndexContext::TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper)
109-
: Txc(txc)
110-
, BlobManagerDb(std::make_shared<TBlobManagerDb>(txc.DB))
110+
TWriteIndexContext::TWriteIndexContext(NTable::TDatabase* db, IDbWrapper& dbWrapper, TColumnEngineForLogs& engineLogs)
111+
: DB(db)
111112
, DBWrapper(dbWrapper)
113+
, EngineLogs(engineLogs)
112114
{
113115

114116
}

ydb/core/tx/columnshard/engines/changes/abstract/abstract.h

+7-7
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ class TFinalizationContext: TNonCopyable {
7474

7575
class TWriteIndexContext: TNonCopyable {
7676
public:
77-
NTabletFlatExecutor::TTransactionContext& Txc;
78-
std::shared_ptr<TBlobManagerDb> BlobManagerDb;
77+
NTable::TDatabase* DB;
7978
IDbWrapper& DBWrapper;
80-
TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper);
79+
TColumnEngineForLogs& EngineLogs;
80+
TWriteIndexContext(NTable::TDatabase* db, IDbWrapper& dbWrapper, TColumnEngineForLogs& engineLogs);
8181
};
8282

8383
class TChangesFinishContext {
@@ -146,8 +146,8 @@ class TColumnEngineChanges {
146146
protected:
147147
virtual void DoDebugString(TStringOutput& out) const = 0;
148148
virtual void DoCompile(TFinalizationContext& context) = 0;
149-
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) = 0;
150-
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) = 0;
149+
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) = 0;
150+
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) = 0;
151151
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0;
152152
virtual bool NeedConstruction() const {
153153
return true;
@@ -217,8 +217,8 @@ class TColumnEngineChanges {
217217
virtual TPortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0;
218218
virtual bool NeedWritePortion(const ui32 index) const = 0;
219219

220-
void WriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context);
221-
void WriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context);
220+
void WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context);
221+
void WriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context);
222222

223223
void Compile(TFinalizationContext& context) noexcept;
224224

ydb/core/tx/columnshard/engines/changes/cleanup.cpp

+21-13
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,39 @@ void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const {
1515
}
1616
}
1717

18-
void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
19-
self.IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
18+
void TCleanupColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
2019
THashSet<ui64> pathIds;
21-
for (auto&& p : PortionsToDrop) {
22-
p.RemoveFromDatabase(context.DBWrapper);
20+
if (self) {
21+
for (auto&& p : PortionsToDrop) {
22+
p.RemoveFromDatabase(context.DBWrapper);
2323

24-
auto removing = BlobsAction.GetRemoving(p);
25-
for (auto&& r : p.Records) {
26-
removing->DeclareRemove((TTabletId)self.TabletID(), r.BlobRange.BlobId);
24+
auto removing = BlobsAction.GetRemoving(p);
25+
for (auto&& r : p.Records) {
26+
removing->DeclareRemove((TTabletId)self->TabletID(), r.BlobRange.BlobId);
27+
}
28+
pathIds.emplace(p.GetPathId());
29+
}
30+
if (context.DB) {
31+
for (auto&& p : pathIds) {
32+
self->TablesManager.TryFinalizeDropPath(*context.DB, p);
33+
}
2734
}
28-
pathIds.emplace(p.GetPathId());
29-
self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum());
30-
}
31-
for (auto&& p: pathIds) {
32-
self.TablesManager.TryFinalizeDropPath(context.Txc, p);
3335
}
3436
}
3537

36-
void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& /*self*/, TWriteIndexCompleteContext& context) {
38+
void TCleanupColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
3739
for (auto& portionInfo : PortionsToDrop) {
3840
if (!context.EngineLogs.ErasePortion(portionInfo)) {
3941
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
4042
}
4143
}
4244
context.TriggerActivity = NeedRepeat ? NColumnShard::TBackgroundActivity::Cleanup() : NColumnShard::TBackgroundActivity::None();
45+
if (self) {
46+
self->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
47+
for (auto&& p : PortionsToDrop) {
48+
self->IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum());
49+
}
50+
}
4351
}
4452

4553
void TCleanupColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {

ydb/core/tx/columnshard/engines/changes/cleanup.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ class TCleanupColumnEngineChanges: public TColumnEngineChanges {
99
THashMap<TString, THashSet<NOlap::TEvictedBlob>> BlobsToForget;
1010
THashMap<TString, std::vector<std::shared_ptr<TPortionInfo>>> StoragePortions;
1111
protected:
12-
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
13-
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
12+
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
13+
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override;
1414

1515
virtual void DoStart(NColumnShard::TColumnShard& self) override;
1616
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;

ydb/core/tx/columnshard/engines/changes/compaction.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
4444
GranuleMeta->OnCompactionStarted();
4545
}
4646

47-
void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
47+
void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
4848
TBase::DoWriteIndexOnComplete(self, context);
49-
self.IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds());
49+
if (self) {
50+
self->IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds());
51+
}
5052
}
5153

5254
void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) {

ydb/core/tx/columnshard/engines/changes/compaction.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class TCompactColumnEngineChanges: public TChangesWithAppend {
1414
protected:
1515
std::shared_ptr<TGranuleMeta> GranuleMeta;
1616

17-
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
17+
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
1818

1919
virtual void DoStart(NColumnShard::TColumnShard& self) override;
2020
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;

ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,13 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
246246
return TConclusionStatus::Success();
247247
}
248248

249-
void TGeneralCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
249+
void TGeneralCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
250250
TBase::DoWriteIndexOnComplete(self, context);
251-
self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
252-
self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
253-
self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
251+
if (self) {
252+
self->IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
253+
self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
254+
self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
255+
}
254256
}
255257

256258
void TGeneralCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {

ydb/core/tx/columnshard/engines/changes/general_compaction.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace NKikimr::NOlap::NCompaction {
77
class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
88
private:
99
using TBase = TCompactColumnEngineChanges;
10-
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
10+
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
1111
std::map<NIndexedReader::TSortableBatchPosition, bool> CheckPoints;
1212
void BuildAppendedPortionsByFullBatches(TConstructionContext& context) noexcept;
1313
void BuildAppendedPortionsByChunks(TConstructionContext& context) noexcept;

ydb/core/tx/columnshard/engines/changes/indexation.cpp

+17-13
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77

88
namespace NKikimr::NOlap {
99

10-
void TInsertColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
10+
void TInsertColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
1111
TBase::DoWriteIndexOnExecute(self, context);
12-
auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId);
13-
for (const auto& insertedData : DataToIndex) {
14-
self.InsertTable->EraseCommittedOnExecute(context.DBWrapper, insertedData, removing);
12+
if (self) {
13+
auto removing = BlobsAction.GetRemoving(IStoragesManager::DefaultStorageId);
14+
for (const auto& insertedData : DataToIndex) {
15+
self->InsertTable->EraseCommittedOnExecute(context.DBWrapper, insertedData, removing);
16+
}
1517
}
1618
}
1719

@@ -26,17 +28,19 @@ void TInsertColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
2628
self.BackgroundController.StartIndexing(*this);
2729
}
2830

29-
void TInsertColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
31+
void TInsertColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
3032
TBase::DoWriteIndexOnComplete(self, context);
31-
for (const auto& insertedData : DataToIndex) {
32-
self.InsertTable->EraseCommittedOnComplete(insertedData);
33-
}
34-
if (!DataToIndex.empty()) {
35-
self.UpdateInsertTableCounters();
33+
if (self) {
34+
for (const auto& insertedData : DataToIndex) {
35+
self->InsertTable->EraseCommittedOnComplete(insertedData);
36+
}
37+
if (!DataToIndex.empty()) {
38+
self->UpdateInsertTableCounters();
39+
}
40+
self->IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, context.BlobsWritten);
41+
self->IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, context.BytesWritten);
42+
self->IncCounter(NColumnShard::COUNTER_INDEXING_TIME, context.Duration.MilliSeconds());
3643
}
37-
self.IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, context.BlobsWritten);
38-
self.IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, context.BytesWritten);
39-
self.IncCounter(NColumnShard::COUNTER_INDEXING_TIME, context.Duration.MilliSeconds());
4044
}
4145

4246
void TInsertColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) {

ydb/core/tx/columnshard/engines/changes/indexation.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ class TInsertColumnEngineChanges: public TChangesWithAppend {
1414
const TIndexInfo& indexInfo, const TInsertedData& inserted) const;
1515
std::vector<NOlap::TInsertedData> DataToIndex;
1616
protected:
17-
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
18-
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
17+
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
18+
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override;
1919

2020
virtual void DoStart(NColumnShard::TColumnShard& self) override;
2121
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;

0 commit comments

Comments
 (0)