Skip to content

Commit 47cd8c5

Browse files
Merge 3be78ef into c6e9a17
2 parents c6e9a17 + 3be78ef commit 47cd8c5

File tree

17 files changed

+378
-46
lines changed

17 files changed

+378
-46
lines changed

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ class TTxUpdateSchema: public TTransactionBase<TColumnShard> {
126126
};
127127

128128
bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
129-
NActors::TLogContextGuard gLogging =
130-
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
129+
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
130+
"process", "TTxUpdateSchema::Execute");
131131
ACFL_INFO("step", "TTxUpdateSchema.Execute_Start")("details", Self->NormalizerController.DebugString());
132132

133133
while (!Self->NormalizerController.IsNormalizationFinished()) {
@@ -153,6 +153,8 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
153153
}
154154

155155
void TTxUpdateSchema::Complete(const TActorContext& ctx) {
156+
NActors::TLogContextGuard gLogging =
157+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxUpdateSchema::Complete");
156158
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete");
157159
Self->Counters.GetCSCounters().Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant);
158160
if (NormalizerTasks.empty()) {
@@ -190,26 +192,26 @@ class TTxApplyNormalizer: public TTransactionBase<TColumnShard> {
190192

191193
bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) {
192194
NActors::TLogContextGuard gLogging =
193-
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
195+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "TTxApplyNormalizer::Execute");
194196
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString());
195197
if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) {
196198
return false;
197199
}
198200

199-
if (Self->NormalizerController.GetNormalizer()->GetActiveTasksCount() == 1) {
201+
if (Self->NormalizerController.GetNormalizer()->DecActiveCounters() == 1) {
200202
NIceDb::TNiceDb db(txc.DB);
201203
Self->NormalizerController.OnNormalizerFinished(db);
202204
}
203205
return true;
204206
}
205207

206208
void TTxApplyNormalizer::Complete(const TActorContext& ctx) {
207-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Complete")("tablet_id", Self->TabletID())("event", "initialize_shard");
209+
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
210+
"event", "TTxApplyNormalizer::Complete");
208211
AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
209-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "apply_normalizer_changes")(
212+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")(
210213
"details", Self->NormalizerController.DebugString())("size", Changes->GetSize());
211214
Changes->ApplyOnComplete(Self->NormalizerController);
212-
Self->NormalizerController.GetNormalizer()->OnResultReady();
213215
if (Self->NormalizerController.GetNormalizer()->HasActiveTasks()) {
214216
return;
215217
}
@@ -240,6 +242,8 @@ class TTxInitSchema: public TTransactionBase<TColumnShard> {
240242
};
241243

242244
bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
245+
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
246+
"process", "TTxInitSchema::Execute");
243247
LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID());
244248

245249
const bool isFirstRun = txc.DB.GetScheme().IsEmpty();
@@ -286,6 +290,8 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
286290
}
287291

288292
void TTxInitSchema::Complete(const TActorContext& ctx) {
293+
NActors::TLogContextGuard gLogging =
294+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxInitSchema::Complete");
289295
Self->Counters.GetCSCounters().Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant);
290296
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
291297
Self->Execute(new TTxUpdateSchema(Self), ctx);

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,9 +1279,17 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12791279
bool reask = false;
12801280
for (auto&& i : PortionsByPath) {
12811281
for (auto&& p : i.second) {
1282-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1283-
if (!rowset.IsReady()) {
1284-
reask = true;
1282+
{
1283+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1284+
if (!rowset.IsReady()) {
1285+
reask = true;
1286+
}
1287+
}
1288+
{
1289+
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1290+
if (!rowset.IsReady()) {
1291+
reask = true;
1292+
}
12851293
}
12861294
}
12871295
}
@@ -1295,12 +1303,12 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12951303
std::vector<NOlap::TColumnChunkLoadContextV1> records;
12961304
std::vector<NOlap::TIndexChunkLoadContext> indexes;
12971305
{
1298-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1306+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
12991307
if (!rowset.IsReady()) {
13001308
return false;
13011309
}
13021310
while (!rowset.EndOfSet()) {
1303-
records.emplace_back(NOlap::TColumnChunkLoadContextV1(rowset));
1311+
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
13041312
if (!rowset.Next()) {
13051313
return false;
13061314
}

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,15 @@ struct Schema : NIceDb::Schema {
569569
using TColumns = TableColumns<PathId, PortionId, SSColumnId, ChunkIdx, Metadata, BlobIdx, Offset, Size>;
570570
};
571571

572+
struct IndexColumnsV2: Table<ColumnsV1TableId> {
573+
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
574+
struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {};
575+
struct Metadata: Column<3, NScheme::NTypeIds::String> {};
576+
577+
using TKey = TableKey<PathId, PortionId>;
578+
using TColumns = TableColumns<PathId, PortionId, Metadata>;
579+
};
580+
572581
using TTables = SchemaTables<
573582
Value,
574583
TxInfo,
@@ -607,7 +616,8 @@ struct Schema : NIceDb::Schema {
607616
TxDependencies,
608617
TxStates,
609618
TxEvents,
610-
IndexColumnsV1
619+
IndexColumnsV1,
620+
IndexColumnsV2
611621
>;
612622

613623
//
@@ -997,6 +1007,29 @@ class TColumnChunkLoadContextV1 {
9971007
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
9981008

9991009
public:
1010+
template <class TSource>
1011+
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
1012+
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
1013+
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
1014+
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
1015+
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
1016+
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
1017+
for (auto&& i : metaProto.GetChunks()) {
1018+
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
1019+
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetMetadata());
1020+
records.emplace_back(std::move(result));
1021+
}
1022+
}
1023+
1024+
NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
1025+
NKikimrTxColumnShard::TColumnChunkInfo proto;
1026+
proto.SetSSColumnId(Address.GetColumnId());
1027+
proto.SetChunkIdx(Address.GetChunkIdx());
1028+
*proto.MutableMetadata() = MetaProto;
1029+
*proto.MutableBlobRangeLink() = BlobRange.SerializeToProto();
1030+
return proto;
1031+
}
1032+
10001033
TFullChunkAddress GetFullChunkAddress() const {
10011034
return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx());
10021035
}

ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
namespace NKikimr::NOlap::NActualizer {
77

88
TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
9-
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
10-
: MemoryUsageLimit(memoryUsageLimit)
9+
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
10+
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
11+
: VersionedIndex(versionedIndex)
12+
, MemoryUsageLimit(memoryUsageLimit)
1113
, SaverContext(saverContext)
1214
, Counters(counters)
1315
, Controller(controller)
@@ -31,10 +33,10 @@ bool TTieringProcessContext::AddPortion(
3133
};
3234
auto it = Tasks.find(features.GetRWAddress());
3335
if (it == Tasks.end()) {
34-
std::vector<TTaskConstructor> tasks = {buildNewTask()};
36+
std::vector<TTaskConstructor> tasks = { buildNewTask() };
3537
it = Tasks.emplace(features.GetRWAddress(), std::move(tasks)).first;
3638
}
37-
if (it->second.back().GetTxWriteVolume() + info->GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2 && it->second.back().GetTxWriteVolume()) {
39+
if (!it->second.back().CanTakePortionInTx(info, VersionedIndex)) {
3840
if (Controller->IsNewTaskAvailable(it->first, it->second.size())) {
3941
it->second.emplace_back(buildNewTask());
4042
} else {
@@ -53,7 +55,7 @@ bool TTieringProcessContext::AddPortion(
5355
}
5456
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(info);
5557
}
56-
it->second.back().MutableTxWriteVolume() += info->GetTxVolume();
58+
it->second.back().TakePortionInTx(info, VersionedIndex);
5759
if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) {
5860
AFL_VERIFY(dWait);
5961
Counters.OnPortionToDrop(info->GetTotalBlobBytes(), *dWait);

ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,34 @@ class TTaskConstructor {
1515
YDB_READONLY_DEF(std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>, MemoryPredictor);
1616
YDB_READONLY_DEF(std::shared_ptr<TTTLColumnEngineChanges>, Task);
1717
YDB_ACCESSOR(ui64, MemoryUsage, 0);
18-
YDB_ACCESSOR(ui64, TxWriteVolume, 0);
18+
YDB_READONLY(ui64, PortionsCount, 0);
19+
YDB_READONLY(ui64, ChunksCount, 0);
20+
1921
public:
2022
TTaskConstructor(const std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>& predictor, const std::shared_ptr<TTTLColumnEngineChanges>& task)
2123
: MemoryPredictor(predictor)
2224
, Task(task) {
2325

2426
}
27+
28+
bool CanTakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
29+
if (!PortionsCount) {
30+
return true;
31+
}
32+
return
33+
(PortionsCount + 1 < 1000) &&
34+
(ChunksCount + portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount()) < 100000);
35+
}
36+
37+
void TakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
38+
++PortionsCount;
39+
ChunksCount += portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount());
40+
}
2541
};
2642

2743
class TTieringProcessContext {
2844
private:
45+
const TVersionedIndex& VersionedIndex;
2946
THashSet<TPortionAddress> UsedPortions;
3047
const ui64 MemoryUsageLimit;
3148
TSaverContext SaverContext;
@@ -63,7 +80,8 @@ class TTieringProcessContext {
6380
}
6481
}
6582

66-
TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager,
83+
TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
84+
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
6785
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller);
6886
};
6987

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -316,11 +316,13 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
316316
std::shared_ptr<TCleanupPortionsColumnEngineChanges> changes = std::make_shared<TCleanupPortionsColumnEngineChanges>(StoragesManager);
317317

318318
// Add all portions from dropped paths
319-
ui64 txSize = 0;
320-
const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4;
319+
ui64 portionsCount = 0;
320+
ui64 chunksCount = 0;
321321
ui32 skipLocked = 0;
322322
ui32 portionsFromDrop = 0;
323323
bool limitExceeded = false;
324+
const ui32 maxChunksCount = 100000;
325+
const ui32 maxPortionsCount = 1000;
324326
for (ui64 pathId : pathsToDrop) {
325327
auto g = GranulesStorage->GetGranuleOptional(pathId);
326328
if (!g) {
@@ -335,8 +337,9 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
335337
++skipLocked;
336338
continue;
337339
}
338-
if (txSize + info->GetTxVolume() < txSizeLimit || changes->GetPortionsToDrop().empty()) {
339-
txSize += info->GetTxVolume();
340+
++portionsCount;
341+
chunksCount += info->GetApproxChunksCount(info->GetSchema(VersionedIndex)->GetColumnsCount());
342+
if ((portionsCount < maxPortionsCount && chunksCount < maxChunksCount) || changes->GetPortionsToDrop().empty()) {
340343
} else {
341344
limitExceeded = true;
342345
break;
@@ -360,8 +363,9 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
360363
continue;
361364
}
362365
AFL_VERIFY(it->second[i]->CheckForCleanup(snapshot))("p_snapshot", it->second[i]->GetRemoveSnapshotOptional())("snapshot", snapshot);
363-
if (txSize + it->second[i]->GetTxVolume() < txSizeLimit || changes->GetPortionsToDrop().empty()) {
364-
txSize += it->second[i]->GetTxVolume();
366+
++portionsCount;
367+
chunksCount += it->second[i]->GetApproxChunksCount(it->second[i]->GetSchema(VersionedIndex)->GetColumnsCount());
368+
if ((portionsCount < maxPortionsCount && chunksCount < maxChunksCount) || changes->GetPortionsToDrop().empty()) {
365369
} else {
366370
limitExceeded = true;
367371
break;
@@ -397,7 +401,7 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
397401
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size());
398402

399403
TSaverContext saverContext(StoragesManager);
400-
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, SignalCounters, ActualizationController);
404+
NActualizer::TTieringProcessContext context(memoryUsageLimit, saverContext, dataLocksManager, VersionedIndex, SignalCounters, ActualizationController);
401405
const TDuration actualizationLag = NYDBTest::TControllers::GetColumnShardController()->GetActualizationTasksLag();
402406
for (auto&& i : pathEviction) {
403407
auto g = GetGranuleOptional(i.first);

ydb/core/tx/columnshard/engines/portions/column_record.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,14 @@ class TColumnRecord {
121121
return BlobRange;
122122
}
123123

124+
NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
125+
NKikimrTxColumnShard::TColumnChunkInfo result;
126+
result.SetSSColumnId(GetEntityId());
127+
result.SetChunkIdx(GetChunkIdx());
128+
result.SetMetadata(Meta.SerializeToProto());
129+
*result.MutableBlobRangeLink() = BlobRange.SerializeToProto();
130+
return result;
131+
}
124132
NKikimrColumnShardDataSharingProto::TColumnRecord SerializeToProto() const;
125133
static TConclusion<TColumnRecord> BuildFromProto(const NKikimrColumnShardDataSharingProto::TColumnRecord& proto) {
126134
TColumnRecord result;

ydb/core/tx/columnshard/engines/portions/data_accessor.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,12 @@ void TPortionDataAccessor::SaveToDatabase(IDbWrapper& db, const ui32 firstPKColu
501501
FullValidation();
502502
db.WritePortion(*PortionInfo);
503503
if (!saveOnlyMeta) {
504+
NKikimrTxColumnShard::TIndexPortionAccessor protoData;
505+
for (auto& record : GetRecordsVerified()) {
506+
*protoData.AddChunks() = record.SerializeToDBProto();
507+
}
508+
db.WriteColumns(*PortionInfo, std::move(protoData));
509+
504510
for (auto& record : GetRecordsVerified()) {
505511
db.WriteColumn(*PortionInfo, record, firstPKColumnId);
506512
}
@@ -533,7 +539,7 @@ void TPortionDataAccessor::FullValidation() const {
533539
blobIdxs.emplace(bRange->GetBlobIdxVerified());
534540
}
535541
}
536-
AFL_VERIFY(blobIdxs.size());
542+
AFL_VERIFY(blobIdxs.size())("portion_info", PortionInfo->DebugString());
537543
AFL_VERIFY(PortionInfo->GetBlobIdsCount() == blobIdxs.size());
538544
AFL_VERIFY(PortionInfo->GetBlobIdsCount() == *blobIdxs.rbegin() + 1);
539545
}

ydb/core/tx/columnshard/engines/portions/portion_info.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ ui64 TPortionInfo::GetMetadataMemorySize() const {
5050
return sizeof(TPortionInfo) - sizeof(TPortionMeta) + Meta.GetMetadataMemorySize();
5151
}
5252

53-
ui64 TPortionInfo::GetTxVolume() const {
54-
return 1024;
53+
ui64 TPortionInfo::GetApproxChunksCount(const ui32 schemaColumnsCount) const {
54+
return schemaColumnsCount * 256 * (GetRecordsCount() / 10000 + 1);
5555
}
5656

5757
void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const {

ydb/core/tx/columnshard/engines/portions/portion_info.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,11 @@ class TPortionInfo {
221221
const TString& GetIndexStorageId(const ui32 columnId, const TIndexInfo& indexInfo) const;
222222
const TString& GetEntityStorageId(const ui32 entityId, const TIndexInfo& indexInfo) const;
223223

224-
ui64 GetTxVolume() const; // fake-correct method for determ volume on rewrite this portion in transaction progress
224+
ui64 GetTxVolume() const {
225+
return 1024;
226+
}
227+
228+
ui64 GetApproxChunksCount(const ui32 schemaColumnsCount) const;
225229
ui64 GetMetadataMemorySize() const;
226230

227231
void SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const;

0 commit comments

Comments
 (0)