Skip to content

Commit a04c6f1

Browse files
Merge 69c6bee into 18848f2
2 parents 18848f2 + 69c6bee commit a04c6f1

File tree

10 files changed

+31
-115
lines changed

10 files changed

+31
-115
lines changed

ydb/core/tx/columnshard/blobs_reader/actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev)
1515
bool aborted = false;
1616
if (event.Status != NKikimrProto::EReplyStatus::OK) {
1717
WaitingBlobsCount.Sub(Task->GetWaitingCount());
18-
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(1024)))) {
18+
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(0, 1024)))) {
1919
aborted = true;
2020
}
2121
} else {

ydb/core/tx/columnshard/engines/portions/column_record.cpp

+2-6
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo&
1414
if (context.GetMetaProto().HasRawBytes()) {
1515
RawBytes = context.GetMetaProto().GetRawBytes();
1616
}
17-
if (context.GetMetaProto().HasMinValue()) {
18-
AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
19-
Min = ConstantToScalar(context.GetMetaProto().GetMinValue(), field->type());
20-
}
2117
if (context.GetMetaProto().HasMaxValue()) {
2218
AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
2319
Max = ConstantToScalar(context.GetMetaProto().GetMaxValue(), field->type());
@@ -37,9 +33,9 @@ NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const {
3733
if (RawBytes) {
3834
meta.SetRawBytes(*RawBytes);
3935
}
40-
if (HasMinMax()) {
41-
ScalarToConstant(*Min, *meta.MutableMinValue());
36+
if (HasMax()) {
4237
ScalarToConstant(*Max, *meta.MutableMaxValue());
38+
ScalarToConstant(*Max, *meta.MutableMinValue());
4339
}
4440
return meta;
4541
}

ydb/core/tx/columnshard/engines/portions/portion_info.cpp

-36
Original file line numberDiff line numberDiff line change
@@ -43,21 +43,6 @@ void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArr
4343
Meta.SetTierName(tierName);
4444
}
4545

46-
std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
47-
std::shared_ptr<arrow::Scalar> result;
48-
for (auto&& i : Records) {
49-
if (i.ColumnId == columnId) {
50-
if (!i.GetMeta().GetMin()) {
51-
return nullptr;
52-
}
53-
if (!result || NArrow::ScalarCompare(result, i.GetMeta().GetMin()) > 0) {
54-
result = i.GetMeta().GetMin();
55-
}
56-
}
57-
}
58-
return result;
59-
}
60-
6146
std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
6247
std::shared_ptr<arrow::Scalar> result;
6348
for (auto&& i : Records) {
@@ -130,14 +115,6 @@ ui64 TPortionInfo::GetIndexBytes(const std::set<ui32>& entityIds) const {
130115
return sum;
131116
}
132117

133-
int TPortionInfo::CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
134-
return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns);
135-
}
136-
137-
int TPortionInfo::CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
138-
return CompareMinByColumnIds(item, info.KeyColumns);
139-
}
140-
141118
TString TPortionInfo::DebugString(const bool withDetails) const {
142119
TStringBuilder sb;
143120
sb << "(portion_id:" << Portion << ";" <<
@@ -179,19 +156,6 @@ void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& r
179156
}
180157
}
181158

182-
bool TPortionInfo::HasPkMinMax() const {
183-
bool result = false;
184-
for (auto&& i : Records) {
185-
if (i.ColumnId == Meta.FirstPkColumn) {
186-
if (!i.GetMeta().HasMinMax()) {
187-
return false;
188-
}
189-
result = true;
190-
}
191-
}
192-
return result;
193-
}
194-
195159
std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksPointers(const ui32 columnId) const {
196160
std::vector<const TColumnRecord*> result;
197161
for (auto&& c : Records) {

ydb/core/tx/columnshard/engines/portions/portion_info.h

+1-44
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ class TPortionInfo {
2323
TSnapshot MinSnapshot = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion}
2424
TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one)
2525

26-
bool HasPkMinMax() const;
2726
TPortionMeta Meta;
2827
std::shared_ptr<NOlap::IBlobsStorageOperator> BlobsOperator;
2928
ui64 DeprecatedGranuleId = 0;
@@ -190,7 +189,7 @@ class TPortionInfo {
190189

191190
bool Empty() const { return Records.empty(); }
192191
bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; }
193-
bool Valid() const { return MinSnapshot.Valid() && PathId && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
192+
bool Valid() const { return MinSnapshot.Valid() && PathId && Portion && !Empty() && Produced() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
194193
bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && PathId && Portion; }
195194
bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; }
196195
bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; }
@@ -338,7 +337,6 @@ class TPortionInfo {
338337
void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys,
339338
const TString& tierName);
340339

341-
std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
342340
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
343341

344342
const NArrow::TReplaceKey& IndexKeyStart() const {
@@ -414,48 +412,7 @@ class TPortionInfo {
414412
return GetRawBytes();
415413
}
416414

417-
private:
418-
class TMinGetter {
419-
public:
420-
static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
421-
return portionInfo.MinValue(columnId);
422-
}
423-
};
424-
425-
class TMaxGetter {
426-
public:
427-
static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
428-
return portionInfo.MaxValue(columnId);
429-
}
430-
};
431-
432-
template <class TSelfGetter, class TItemGetter = TSelfGetter>
433-
int CompareByColumnIdsImpl(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
434-
for (auto&& i : columnIds) {
435-
std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i);
436-
std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i);
437-
if (!!valueSelf && !!valueItem) {
438-
const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem);
439-
if (cmpResult) {
440-
return cmpResult;
441-
}
442-
} else if (!!valueSelf) {
443-
return 1;
444-
} else if (!!valueItem) {
445-
return -1;
446-
}
447-
}
448-
return 0;
449-
}
450415
public:
451-
int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const;
452-
453-
int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const;
454-
455-
int CompareMinByColumnIds(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
456-
return CompareByColumnIdsImpl<TMinGetter>(item, columnIds);
457-
}
458-
459416
class TAssembleBlobInfo {
460417
private:
461418
ui32 NullRowsCount = 0;

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ namespace NKikimr::NOlap {
4949
class TNormalizationContext {
5050
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
5151
YDB_ACCESSOR_DEF(TActorId, ColumnshardActor);
52+
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
53+
public:
54+
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
55+
ResourcesGuard = rg;
56+
}
5257
};
5358

5459
class TNormalizationController;
@@ -117,7 +122,7 @@ namespace NKikimr::NOlap {
117122
TString DebugString() const {
118123
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
119124
<< ";current_normalizer_idx=" << CurrentNormalizerIndex
120-
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size()) ? Normalizers[CurrentNormalizerIndex]->GetName() : "";
125+
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size() ? Normalizers[CurrentNormalizerIndex]->GetName() : "");
121126
}
122127

123128
const INormalizerComponent::TPtr& GetNormalizer() const;

ydb/core/tx/columnshard/normalizer/portion/chunks.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class TRowsAndBytesChangesTask: public NConveyor::ITask {
6868
}
6969

7070
public:
71-
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, THashMap<ui64, ISnapshotSchema::TPtr>&&)
71+
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>>)
7272
: Blobs(std::move(blobs))
7373
, Chunks(std::move(chunks))
7474
, NormContext(nCtx)

ydb/core/tx/columnshard/normalizer/portion/min_max.cpp

+9-9
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
1616
private:
1717
THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
1818
TDataContainer Portions;
19-
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
19+
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
2020
TNormalizationContext NormContext;
2121
protected:
2222
virtual bool DoExecute() override {
23-
Y_ABORT_UNLESS(!Schemas.empty());
24-
auto pkColumnIds = Schemas.begin()->second->GetPkColumnsIds();
23+
Y_ABORT_UNLESS(!Schemas->empty());
24+
auto pkColumnIds = Schemas->begin()->second->GetPkColumnsIds();
2525
pkColumnIds.insert(TIndexInfo::GetSpecialColumnIds().begin(), TIndexInfo::GetSpecialColumnIds().end());
2626

2727
for (auto&& portionInfo : Portions) {
28-
auto blobSchema = Schemas.FindPtr(portionInfo->GetPortionId());
28+
auto blobSchema = Schemas->FindPtr(portionInfo->GetPortionId());
2929
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
3030
for (auto&& i : portionInfo->Records) {
3131
auto blobIt = Blobs.find(i.BlobRange);
@@ -47,10 +47,10 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
4747
}
4848

4949
public:
50-
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
50+
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
5151
: Blobs(std::move(blobs))
5252
, Portions(std::move(portions))
53-
, Schemas(std::move(schemas))
53+
, Schemas(schemas)
5454
, NormContext(nCtx)
5555
{}
5656

@@ -135,7 +135,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
135135
}
136136

137137
THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
138-
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
138+
auto schemas = std::make_shared<THashMap<ui64, ISnapshotSchema::TPtr>>();
139139
auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());
140140

141141
{
@@ -161,7 +161,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
161161
auto portionMeta = loadContext.GetPortionMeta();
162162
if (it == portions.end()) {
163163
Y_ABORT_UNLESS(portion.Records.empty());
164-
schemas[portion.GetPortionId()] = currentSchema;
164+
(*schemas)[portion.GetPortionId()] = currentSchema;
165165
auto portionNew = std::make_shared<TPortionInfo>(portion);
166166
portionNew->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
167167
it = portions.emplace(portion.GetPortion(), portionNew).first;
@@ -202,7 +202,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
202202
}
203203
++brokenPortioncCount;
204204
package.emplace_back(portion.second);
205-
if (package.size() == 100) {
205+
if (package.size() == 1000) {
206206
std::vector<std::shared_ptr<TPortionInfo>> local;
207207
local.swap(package);
208208
tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(local), schemas));

ydb/core/tx/columnshard/normalizer/portion/normalizer.h

+7-7
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
1818
private:
1919
using TBase = NOlap::NBlobOperations::NRead::ITask;
2020
typename TConveyorTask::TDataContainer Data;
21-
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
21+
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
2222
TNormalizationContext NormContext;
2323

2424
public:
25-
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
25+
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
2626
: TBase(actions, "CS::NORMALIZER")
2727
, Data(std::move(data))
2828
, Schemas(std::move(schemas))
@@ -32,8 +32,8 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
3232

3333
protected:
3434
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
35-
Y_UNUSED(resourcesGuard);
36-
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), std::move(Schemas));
35+
NormContext.SetResourcesGuard(resourcesGuard);
36+
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), Schemas);
3737
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
3838
}
3939

@@ -49,13 +49,13 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
4949
template <class TConveyorTask>
5050
class TPortionsNormalizerTask : public INormalizerTask {
5151
typename TConveyorTask::TDataContainer Package;
52-
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
52+
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
5353
public:
5454
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package)
5555
: Package(std::move(package))
5656
{}
5757

58-
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const THashMap<ui64, ISnapshotSchema::TPtr>& schemas)
58+
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
5959
: Package(std::move(package))
6060
, Schemas(schemas)
6161
{}
@@ -71,7 +71,7 @@ class TPortionsNormalizerTask : public INormalizerTask {
7171
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction};
7272
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
7373
nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
74-
std::make_shared<TReadPortionsTask<TConveyorTask>>( nCtx, actions, std::move(Package), std::move(Schemas) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
74+
std::make_shared<TReadPortionsTask<TConveyorTask>>(nCtx, actions, std::move(Package), Schemas), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
7575
}
7676
};
7777
}

ydb/core/tx/columnshard/splitter/chunk_meta.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,22 @@
44

55
namespace NKikimr::NOlap {
66

7-
TSimpleChunkMeta::TSimpleChunkMeta(const std::shared_ptr<arrow::Array>& column, const bool needMinMax, const bool isSortedColumn) {
7+
TSimpleChunkMeta::TSimpleChunkMeta(const std::shared_ptr<arrow::Array>& column, const bool needMax, const bool isSortedColumn) {
88
Y_ABORT_UNLESS(column);
99
Y_ABORT_UNLESS(column->length());
1010
NumRows = column->length();
1111
RawBytes = NArrow::GetArrayDataSize(column);
1212

13-
if (needMinMax) {
13+
if (needMax) {
1414
std::pair<i32, i32> minMaxPos = {0, (column->length() - 1)};
1515
if (!isSortedColumn) {
1616
minMaxPos = NArrow::FindMinMaxPosition(column);
1717
Y_ABORT_UNLESS(minMaxPos.first >= 0);
1818
Y_ABORT_UNLESS(minMaxPos.second >= 0);
1919
}
2020

21-
Min = NArrow::GetScalar(column, minMaxPos.first);
2221
Max = NArrow::GetScalar(column, minMaxPos.second);
2322

24-
Y_ABORT_UNLESS(Min);
2523
Y_ABORT_UNLESS(Max);
2624
}
2725
}

ydb/core/tx/columnshard/splitter/chunk_meta.h

+2-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ namespace NKikimr::NOlap {
1212

1313
class TSimpleChunkMeta {
1414
protected:
15-
std::shared_ptr<arrow::Scalar> Min;
1615
std::shared_ptr<arrow::Scalar> Max;
1716
std::optional<ui32> NumRows;
1817
std::optional<ui32> RawBytes;
@@ -25,9 +24,6 @@ class TSimpleChunkMeta {
2524
return sizeof(ui32) + sizeof(ui32) + 8 * 3 * 2;
2625
}
2726

28-
std::shared_ptr<arrow::Scalar> GetMin() const {
29-
return Min;
30-
}
3127
std::shared_ptr<arrow::Scalar> GetMax() const {
3228
return Max;
3329
}
@@ -49,8 +45,8 @@ class TSimpleChunkMeta {
4945
return *RawBytes;
5046
}
5147

52-
bool HasMinMax() const noexcept {
53-
return Min.get() && Max.get();
48+
bool HasMax() const noexcept {
49+
return Max.get();
5450
}
5551

5652
};

0 commit comments

Comments
 (0)