Skip to content

Commit 67f4666

Browse files
Merge 6b7f597 into 18174b0
2 parents 18174b0 + 6b7f597 commit 67f4666

File tree

12 files changed

+72
-35
lines changed

12 files changed

+72
-35
lines changed

ydb/core/formats/arrow/reader/merger.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,16 @@ void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std
154154
Y_ABORT_UNLESS(SortHeap.Size());
155155
Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
156156
if (!SortHeap.Current().IsDeleted()) {
157+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_add", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
157158
if (builder) {
158159
builder->AddRecord(SortHeap.Current().GetKeyColumns());
159160
}
160161
if (resultScanData && resultPosition) {
161162
*resultScanData = SortHeap.Current().GetKeyColumns().GetSorting();
162163
*resultPosition = SortHeap.Current().GetKeyColumns().GetPosition();
163164
}
165+
} else {
166+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
164167
}
165168
CheckSequenceInDebug(SortHeap.Current().GetKeyColumns());
166169
const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition();
@@ -169,6 +172,7 @@ void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std
169172
bool isFirst = true;
170173
while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) {
171174
if (!isFirst) {
175+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip1", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
172176
auto& anotherIterator = SortHeap.Current();
173177
if (PossibleSameVersionFlag) {
174178
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater)

ydb/core/formats/arrow/size_calcer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow:
5858
specialKeysPayload = specialKeys.SerializePayloadToString();
5959
specialKeysFull = specialKeys.SerializeFullToString();
6060
}
61-
return TSerializedBatch(NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
61+
return TSerializedBatch(NSerialization::TNativeSerializer().SerializePayload(batch), batch->num_rows(),
6262
NArrow::GetBatchDataSize(batch), specialKeysPayload, specialKeysFull);
6363
}
6464

ydb/core/kqp/ut/tx/kqp_sink_common.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ class TTableDataModificationTester {
1919
YDB_ACCESSOR(bool, IsOlap, false);
2020
YDB_ACCESSOR(bool, FastSnapshotExpiration, false);
2121
YDB_ACCESSOR(bool, DisableSinks, false);
22-
22+
enum class EOlapIndexationPolicy {
23+
After,
24+
InProgress
25+
};
26+
EOlapIndexationPolicy OlapIndexationPolicy = EOlapIndexationPolicy::After;
2327
virtual void DoExecute() = 0;
2428
public:
2529
void Execute() {
@@ -39,7 +43,9 @@ class TTableDataModificationTester {
3943
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
4044
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
4145
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
42-
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
46+
if (OlapIndexationPolicy == EOlapIndexationPolicy::After) {
47+
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
48+
}
4349

4450
{
4551
auto type = IsOlap ? "COLUMN" : "ROW";
@@ -98,10 +104,11 @@ class TTableDataModificationTester {
98104
)", TTxControl::NoTx()).GetValueSync();
99105
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
100106
}
101-
102107
DoExecute();
103-
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
104-
csController->WaitIndexation(TDuration::Seconds(5));
108+
if (OlapIndexationPolicy == EOlapIndexationPolicy::After) {
109+
csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);
110+
csController->WaitIndexation(TDuration::Seconds(5));
111+
}
105112
}
106113

107114
};

ydb/core/tx/columnshard/engines/storage/granule/granule.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) {
4545
return true;
4646
}
4747

48-
void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard) {
48+
void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter,
49+
NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard, const bool onLoad) {
4950
if (portionAfter) {
5051
PortionInfoGuard.OnNewPortion(portionAfter);
52+
// PortionsIndex.AddPortion(portionAfter, onLoad);
5153
if (!portionAfter->HasRemoveSnapshot()) {
52-
PortionsIndex.AddPortion(portionAfter);
5354
if (modificationGuard) {
5455
modificationGuard->AddPortion(portionAfter);
5556
} else {
@@ -74,8 +75,8 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port
7475
void TGranuleMeta::OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore) {
7576
if (portionBefore) {
7677
PortionInfoGuard.OnDropPortion(portionBefore);
78+
// PortionsIndex.RemovePortion(portionBefore);
7779
if (!portionBefore->HasRemoveSnapshot()) {
78-
PortionsIndex.RemovePortion(portionBefore);
7980
OptimizerPlanner->StartModificationGuard().RemovePortion(portionBefore);
8081
ActualizationIndex->RemovePortion(portionBefore);
8182
}

ydb/core/tx/columnshard/engines/storage/granule/granule.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ class TGranuleMeta: TNonCopyable {
159159
NGranule::NPortionsIndex::TPortionsIndex PortionsIndex;
160160

161161
void OnBeforeChangePortion(const std::shared_ptr<TPortionInfo> portionBefore);
162-
void OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard);
162+
void OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter,
163+
NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard, const bool onLoad = false);
163164
void OnAdditiveSummaryChange() const;
164165
YDB_READONLY(TMonotonic, LastCompactionInstant, TMonotonic::Zero());
165166
public:
@@ -228,8 +229,9 @@ class TGranuleMeta: TNonCopyable {
228229
void OnAfterPortionsLoad() {
229230
auto g = OptimizerPlanner->StartModificationGuard();
230231
for (auto&& i : Portions) {
231-
OnAfterChangePortion(i.second, &g);
232+
OnAfterChangePortion(i.second, &g, true);
232233
}
234+
// PortionsIndex.OnLoadFinished();
233235
}
234236

235237
std::shared_ptr<NArrow::NSplitter::TSerializationStats> BuildSerializationStats(ISnapshotSchema::TPtr schema) const {

ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
5858
const TPortionInfoStat stat(p);
5959
auto it = itFrom;
6060
while (true) {
61-
RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
61+
// RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
6262
it->second.RemoveContained(stat);
63-
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
64-
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
63+
// RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
64+
// BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
6565
if (it == itTo) {
6666
break;
6767
}
@@ -71,43 +71,46 @@ void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
7171
if (itFrom != itTo) {
7272
itFrom->second.RemoveStart(p);
7373
if (itFrom->second.IsEmpty()) {
74-
RemoveFromMemoryUsageControl(itFrom->second.GetIntervalStats());
74+
// RemoveFromMemoryUsageControl(itFrom->second.GetIntervalStats());
7575
Points.erase(itFrom);
7676
}
7777
itTo->second.RemoveFinish(p);
7878
if (itTo->second.IsEmpty()) {
79-
RemoveFromMemoryUsageControl(itTo->second.GetIntervalStats());
79+
// RemoveFromMemoryUsageControl(itTo->second.GetIntervalStats());
8080
Points.erase(itTo);
8181
}
8282
} else {
8383
itTo->second.RemoveStart(p);
8484
itTo->second.RemoveFinish(p);
8585
if (itTo->second.IsEmpty()) {
86-
RemoveFromMemoryUsageControl(itTo->second.GetIntervalStats());
86+
// RemoveFromMemoryUsageControl(itTo->second.GetIntervalStats());
8787
Points.erase(itTo);
8888
}
8989
}
9090
RawMemoryUsage.FlushCounters();
9191
BlobMemoryUsage.FlushCounters();
9292
}
9393

94-
void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
94+
void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p, const bool onLoad) {
9595
auto itFrom = InsertPoint(p->IndexKeyStart());
9696
itFrom->second.AddStart(p);
9797
auto itTo = InsertPoint(p->IndexKeyEnd());
9898
itTo->second.AddFinish(p);
9999

100-
auto it = itFrom;
101100
const TPortionInfoStat stat(p);
102-
while (true) {
103-
RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
101+
for (auto it = itFrom;; ++it) {
102+
AFL_VERIFY(it != Points.end());
103+
if (!onLoad) {
104+
// RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
105+
}
104106
it->second.AddContained(stat);
105-
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
106-
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
107+
if (!onLoad) {
108+
// RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
109+
// BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
110+
}
107111
if (it == itTo) {
108112
break;
109113
}
110-
AFL_VERIFY(++it != Points.end());
111114
}
112115
RawMemoryUsage.FlushCounters();
113116
BlobMemoryUsage.FlushCounters();

ydb/core/tx/columnshard/engines/storage/granule/portions_index.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ class TPortionsIndex {
160160
TIntervalMemoryMonitoring RawMemoryUsage;
161161
TIntervalMemoryMonitoring BlobMemoryUsage;
162162
const TGranuleMeta& Owner;
163+
bool Initialized = false;
163164

164165
std::map<NArrow::TReplaceKey, TPortionsPKPoint>::iterator InsertPoint(const NArrow::TReplaceKey& key) {
165166
auto it = Points.find(key);
@@ -170,15 +171,15 @@ class TPortionsIndex {
170171
--itPred;
171172
it->second.ProvidePortions(itPred->second);
172173
}
173-
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
174-
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
174+
// RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
175+
// BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
175176
}
176177
return it;
177178
}
178179

179180
void RemoveFromMemoryUsageControl(const TIntervalInfoStat& stat) {
180-
RawMemoryUsage.Remove(stat.GetMinRawBytes());
181-
BlobMemoryUsage.Remove(stat.GetBlobBytes());
181+
// RawMemoryUsage.Remove(stat.GetMinRawBytes());
182+
// BlobMemoryUsage.Remove(stat.GetBlobBytes());
182183
}
183184

184185
public:
@@ -202,9 +203,16 @@ class TPortionsIndex {
202203
return Points;
203204
}
204205

205-
void AddPortion(const std::shared_ptr<TPortionInfo>& p);
206-
206+
void AddPortion(const std::shared_ptr<TPortionInfo>& p, const bool onLoad = false);
207207
void RemovePortion(const std::shared_ptr<TPortionInfo>& p);
208+
void OnLoadFinished() {
209+
AFL_VERIFY(!Initialized);
210+
Initialized = true;
211+
// for (auto&& i : Points) {
212+
// RawMemoryUsage.Add(i.second.GetIntervalStats().GetMinRawBytes());
213+
// BlobMemoryUsage.Add(i.second.GetIntervalStats().GetBlobBytes());
214+
// }
215+
}
208216

209217
class TPortionIntervals {
210218
private:

ydb/core/tx/columnshard/engines/storage/granule/storage.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCom
1616
if (gPriority.IsZero() || (priorityChecker && gPriority < *priorityChecker)) {
1717
continue;
1818
}
19+
if (i.second.GetPortionsIndex().GetMinRawMemoryRead() > 2.6 * 1024 * 1024 ||
20+
i.second.GetPortionsIndex().GetMinBlobMemoryRead() > 1.5 * 1024 * 1024) {
21+
gPriority.Force();
22+
}
1923
granulesSorted.emplace(gPriority, i.second);
2024
if (++countChecker % 100 == 0) {
2125
for (auto&& it = granulesSorted.rbegin(); it != granulesSorted.rend(); ++it) {

ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ class TOptimizationPriority {
4444
return TStringBuilder() << "(" << Level << "," << InternalLevelWeight << ")";
4545
}
4646

47+
void Force() {
48+
Level += 100;
49+
}
50+
4751
static TOptimizationPriority Critical(const i64 weight) {
4852
return TOptimizationPriority(10, weight);
4953
}

ydb/core/tx/columnshard/operations/manager.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,6 @@ TConclusion<EOperationBehaviour> TOperationsManager::GetBehaviour(const NEvents:
255255
return EOperationBehaviour::NoTxWrite;
256256
}
257257

258-
if (evWrite.Record.HasTxId() && evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_PREPARE) {
259-
return EOperationBehaviour::InTxWrite;
260-
}
261258
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("proto", evWrite.Record.DebugString())("event", "undefined behaviour");
262259
return TConclusionStatus::Fail("undefined request for detect tx type");
263260
}

0 commit comments

Comments
 (0)