Skip to content

Commit 6a18bdc

Browse files
Merge 5cb91b2 into f00baa4
2 parents f00baa4 + 5cb91b2 commit 6a18bdc

File tree

24 files changed

+441
-52
lines changed

24 files changed

+441
-52
lines changed

ydb/core/tx/columnshard/columnshard__init.cpp

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ class TTxUpdateSchema: public TTransactionBase<TColumnShard> {
126126
};
127127

128128
bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
129-
NActors::TLogContextGuard gLogging =
130-
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
129+
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
130+
"process", "TTxUpdateSchema::Execute");
131131
ACFL_INFO("step", "TTxUpdateSchema.Execute_Start")("details", Self->NormalizerController.DebugString());
132132

133133
while (!Self->NormalizerController.IsNormalizationFinished()) {
@@ -153,6 +153,8 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
153153
}
154154

155155
void TTxUpdateSchema::Complete(const TActorContext& ctx) {
156+
NActors::TLogContextGuard gLogging =
157+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxUpdateSchema::Complete");
156158
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete");
157159
Self->Counters.GetCSCounters().Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant);
158160
if (NormalizerTasks.empty()) {
@@ -185,32 +187,34 @@ class TTxApplyNormalizer: public TTransactionBase<TColumnShard> {
185187
}
186188

187189
private:
190+
bool NormalizerFinished = false;
188191
NOlap::INormalizerChanges::TPtr Changes;
189192
};
190193

191194
bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) {
192195
NActors::TLogContextGuard gLogging =
193-
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
196+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "TTxApplyNormalizer::Execute");
194197
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString());
195198
if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) {
196199
return false;
197200
}
198201

199-
if (Self->NormalizerController.GetNormalizer()->GetActiveTasksCount() == 1) {
202+
if (Self->NormalizerController.GetNormalizer()->DecActiveCounters() == 0) {
203+
NormalizerFinished = true;
200204
NIceDb::TNiceDb db(txc.DB);
201205
Self->NormalizerController.OnNormalizerFinished(db);
202206
}
203207
return true;
204208
}
205209

206210
void TTxApplyNormalizer::Complete(const TActorContext& ctx) {
207-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Complete")("tablet_id", Self->TabletID())("event", "initialize_shard");
211+
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
212+
"event", "TTxApplyNormalizer::Complete");
208213
AFL_VERIFY(!Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString());
209-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "apply_normalizer_changes")(
214+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "apply_normalizer_changes")(
210215
"details", Self->NormalizerController.DebugString())("size", Changes->GetSize());
211216
Changes->ApplyOnComplete(Self->NormalizerController);
212-
Self->NormalizerController.GetNormalizer()->OnResultReady();
213-
if (Self->NormalizerController.GetNormalizer()->HasActiveTasks()) {
217+
if (!NormalizerFinished) {
214218
return;
215219
}
216220

@@ -240,6 +244,8 @@ class TTxInitSchema: public TTransactionBase<TColumnShard> {
240244
};
241245

242246
bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
247+
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())(
248+
"process", "TTxInitSchema::Execute");
243249
LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID());
244250

245251
const bool isFirstRun = txc.DB.GetScheme().IsEmpty();
@@ -286,6 +292,8 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
286292
}
287293

288294
void TTxInitSchema::Complete(const TActorContext& ctx) {
295+
NActors::TLogContextGuard gLogging =
296+
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("process", "TTxInitSchema::Complete");
289297
Self->Counters.GetCSCounters().Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant);
290298
LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID(););
291299
Self->Execute(new TTxUpdateSchema(Self), ctx);

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,10 +1278,19 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12781278
TBlobGroupSelector selector(Self->Info());
12791279
bool reask = false;
12801280
for (auto&& i : PortionsByPath) {
1281+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("size", i.second.size())("path_id", i.first);
12811282
for (auto&& p : i.second) {
1282-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1283-
if (!rowset.IsReady()) {
1284-
reask = true;
1283+
{
1284+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1285+
if (!rowset.IsReady()) {
1286+
reask = true;
1287+
}
1288+
}
1289+
{
1290+
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1291+
if (!rowset.IsReady()) {
1292+
reask = true;
1293+
}
12851294
}
12861295
}
12871296
}
@@ -1290,17 +1299,18 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12901299
}
12911300

12921301
for (auto&& i : PortionsByPath) {
1302+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first);
12931303
while (i.second.size()) {
12941304
auto p = i.second.back();
12951305
std::vector<NOlap::TColumnChunkLoadContextV1> records;
12961306
std::vector<NOlap::TIndexChunkLoadContext> indexes;
12971307
{
1298-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1308+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
12991309
if (!rowset.IsReady()) {
13001310
return false;
13011311
}
13021312
while (!rowset.EndOfSet()) {
1303-
records.emplace_back(NOlap::TColumnChunkLoadContextV1(rowset));
1313+
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
13041314
if (!rowset.Next()) {
13051315
return false;
13061316
}
@@ -1321,8 +1331,11 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
13211331
FetchedAccessors.emplace_back(NOlap::TPortionAccessorConstructor::BuildForLoading(p, std::move(records), std::move(indexes)));
13221332
i.second.pop_back();
13231333
}
1334+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())(
1335+
"path_id", i.first);
13241336
}
13251337

1338+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished");
13261339
FetchCallback->OnAccessorsFetched(std::move(FetchedAccessors));
13271340
return true;
13281341
}
@@ -1439,7 +1452,9 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
14391452
switch (ev->GetTypeRewrite()) {
14401453
HFunc(TEvPrivate::TEvTieringModified, Handle);
14411454
HFunc(TEvPrivate::TEvNormalizerResult, Handle);
1455+
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
14421456
default:
1457+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected event in enqueue");
14431458
return NTabletFlatExecutor::TTabletExecutedFlat::Enqueue(ev);
14441459
}
14451460
}

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ struct Schema : NIceDb::Schema {
5656
RepairsTableId,
5757
NormalizersTableId,
5858
NormalizerEventsTableId,
59-
ColumnsV1TableId
59+
ColumnsV1TableId,
60+
ColumnsV2TableId
6061
};
6162

6263
enum class ETierTables: ui32 {
@@ -569,6 +570,15 @@ struct Schema : NIceDb::Schema {
569570
using TColumns = TableColumns<PathId, PortionId, SSColumnId, ChunkIdx, Metadata, BlobIdx, Offset, Size>;
570571
};
571572

573+
struct IndexColumnsV2: Table<ColumnsV2TableId> {
574+
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
575+
struct PortionId: Column<2, NScheme::NTypeIds::Uint64> {};
576+
struct Metadata: Column<3, NScheme::NTypeIds::String> {};
577+
578+
using TKey = TableKey<PathId, PortionId>;
579+
using TColumns = TableColumns<PathId, PortionId, Metadata>;
580+
};
581+
572582
using TTables = SchemaTables<
573583
Value,
574584
TxInfo,
@@ -607,7 +617,8 @@ struct Schema : NIceDb::Schema {
607617
TxDependencies,
608618
TxStates,
609619
TxEvents,
610-
IndexColumnsV1
620+
IndexColumnsV1,
621+
IndexColumnsV2
611622
>;
612623

613624
//
@@ -997,6 +1008,33 @@ class TColumnChunkLoadContextV1 {
9971008
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
9981009

9991010
public:
1011+
TPortionAddress GetPortionAddress() const {
1012+
return TPortionAddress(PathId, PortionId);
1013+
}
1014+
1015+
template <class TSource>
1016+
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
1017+
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
1018+
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
1019+
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
1020+
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
1021+
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
1022+
for (auto&& i : metaProto.GetChunks()) {
1023+
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
1024+
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
1025+
records.emplace_back(std::move(result));
1026+
}
1027+
}
1028+
1029+
NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
1030+
NKikimrTxColumnShard::TColumnChunkInfo proto;
1031+
proto.SetSSColumnId(Address.GetColumnId());
1032+
proto.SetChunkIdx(Address.GetChunkIdx());
1033+
*proto.MutableChunkMetadata() = MetaProto;
1034+
*proto.MutableBlobRangeLink() = BlobRange.SerializeToProto();
1035+
return proto;
1036+
}
1037+
10001038
TFullChunkAddress GetFullChunkAddress() const {
10011039
return TFullChunkAddress(PathId, PortionId, Address.GetEntityId(), Address.GetChunkIdx());
10021040
}

ydb/core/tx/columnshard/data_accessor/actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TActor: public TActorBootstrapped<TActor> {
4848
void Bootstrap();
4949

5050
STFUNC(StateWait) {
51+
const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("self_id", SelfId())("tablet_id", TabletId)("parent", Parent);
5152
switch (ev->GetTypeRewrite()) {
5253
cFunc(NActors::TEvents::TEvPoison::EventType, StartStopping);
5354
hFunc(TEvRegisterController, Handle);

ydb/core/tx/columnshard/data_accessor/manager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class TLocalManager: public IDataAccessorsManager {
9696
const std::shared_ptr<IAccessorCallback> AccessorCallback;
9797

9898
virtual void DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) override {
99+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString());
99100
for (auto&& i : request->GetPathIds()) {
100101
auto it = Managers.find(i);
101102
if (it == Managers.end()) {

ydb/core/tx/columnshard/data_accessor/request.h

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#pragma once
2+
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
23
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
34
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
45

@@ -61,7 +62,7 @@ class TDataAccessorsResult {
6162
}
6263
};
6364

64-
class IDataAccessorRequestsSubscriber {
65+
class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCounter<IDataAccessorRequestsSubscriber> {
6566
private:
6667
THashSet<ui64> RequestIds;
6768

@@ -95,7 +96,6 @@ class IDataAccessorRequestsSubscriber {
9596
class TFakeDataAccessorsSubscriber: public IDataAccessorRequestsSubscriber {
9697
private:
9798
virtual void DoOnRequestsFinished(TDataAccessorsResult&& /*result*/) override {
98-
9999
}
100100
};
101101

@@ -117,6 +117,12 @@ class TPathFetchingState {
117117
THashMap<ui64, TPortionDataAccessor> PortionAccessors;
118118

119119
public:
120+
TString DebugString() const {
121+
TStringBuilder sb;
122+
sb << "portions_count=" << Portions.size();
123+
return sb;
124+
}
125+
120126
TPathFetchingState(const ui64 pathId)
121127
: PathId(pathId) {
122128
}
@@ -161,7 +167,7 @@ class TPathFetchingState {
161167
}
162168
};
163169

164-
class TDataAccessorsRequest {
170+
class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDataAccessorsRequest> {
165171
private:
166172
static inline TAtomicCounter Counter = 0;
167173
ui32 FetchStage = 0;
@@ -191,6 +197,15 @@ class TDataAccessorsRequest {
191197
}
192198

193199
public:
200+
TString DebugString() const {
201+
TStringBuilder sb;
202+
sb << "request_id=" << RequestId << ";";
203+
for (auto&& i : PathIdStatus) {
204+
sb << i.first << "={" << i.second.DebugString() << "};";
205+
}
206+
return sb;
207+
}
208+
194209
TDataAccessorsRequest() = default;
195210

196211
bool HasSubscriber() const {

ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
namespace NKikimr::NOlap::NActualizer {
77

88
TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
9-
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
10-
: MemoryUsageLimit(memoryUsageLimit)
9+
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
10+
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller)
11+
: VersionedIndex(versionedIndex)
12+
, MemoryUsageLimit(memoryUsageLimit)
1113
, SaverContext(saverContext)
1214
, Counters(counters)
1315
, Controller(controller)
@@ -31,10 +33,10 @@ bool TTieringProcessContext::AddPortion(
3133
};
3234
auto it = Tasks.find(features.GetRWAddress());
3335
if (it == Tasks.end()) {
34-
std::vector<TTaskConstructor> tasks = {buildNewTask()};
36+
std::vector<TTaskConstructor> tasks = { buildNewTask() };
3537
it = Tasks.emplace(features.GetRWAddress(), std::move(tasks)).first;
3638
}
37-
if (it->second.back().GetTxWriteVolume() + info->GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2 && it->second.back().GetTxWriteVolume()) {
39+
if (!it->second.back().CanTakePortionInTx(info, VersionedIndex)) {
3840
if (Controller->IsNewTaskAvailable(it->first, it->second.size())) {
3941
it->second.emplace_back(buildNewTask());
4042
} else {
@@ -53,7 +55,7 @@ bool TTieringProcessContext::AddPortion(
5355
}
5456
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(info);
5557
}
56-
it->second.back().MutableTxWriteVolume() += info->GetTxVolume();
58+
it->second.back().TakePortionInTx(info, VersionedIndex);
5759
if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) {
5860
AFL_VERIFY(dWait);
5961
Counters.OnPortionToDrop(info->GetTotalBlobBytes(), *dWait);

ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,34 @@ class TTaskConstructor {
1515
YDB_READONLY_DEF(std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>, MemoryPredictor);
1616
YDB_READONLY_DEF(std::shared_ptr<TTTLColumnEngineChanges>, Task);
1717
YDB_ACCESSOR(ui64, MemoryUsage, 0);
18-
YDB_ACCESSOR(ui64, TxWriteVolume, 0);
18+
YDB_READONLY(ui64, PortionsCount, 0);
19+
YDB_READONLY(ui64, ChunksCount, 0);
20+
1921
public:
2022
TTaskConstructor(const std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>& predictor, const std::shared_ptr<TTTLColumnEngineChanges>& task)
2123
: MemoryPredictor(predictor)
2224
, Task(task) {
2325

2426
}
27+
28+
bool CanTakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
29+
if (!PortionsCount) {
30+
return true;
31+
}
32+
return
33+
(PortionsCount + 1 < 1000) &&
34+
(ChunksCount + portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount()) < 100000);
35+
}
36+
37+
void TakePortionInTx(const TPortionInfo::TConstPtr& portion, const TVersionedIndex& index) {
38+
++PortionsCount;
39+
ChunksCount += portion->GetApproxChunksCount(portion->GetSchema(index)->GetColumnsCount());
40+
}
2541
};
2642

2743
class TTieringProcessContext {
2844
private:
45+
const TVersionedIndex& VersionedIndex;
2946
THashSet<TPortionAddress> UsedPortions;
3047
const ui64 MemoryUsageLimit;
3148
TSaverContext SaverContext;
@@ -63,7 +80,8 @@ class TTieringProcessContext {
6380
}
6481
}
6582

66-
TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager,
83+
TTieringProcessContext(const ui64 memoryUsageLimit, const TSaverContext& saverContext,
84+
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const TVersionedIndex& versionedIndex,
6785
const NColumnShard::TEngineLogsCounters& counters, const std::shared_ptr<TController>& controller);
6886
};
6987

0 commit comments

Comments
 (0)