Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/blobs_reader/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev)
bool aborted = false;
if (event.Status != NKikimrProto::EReplyStatus::OK) {
WaitingBlobsCount.Sub(Task->GetWaitingCount());
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(1024)))) {
if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(0, 1024)))) {
aborted = true;
}
} else {
Expand Down
8 changes: 2 additions & 6 deletions ydb/core/tx/columnshard/engines/portions/column_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ TChunkMeta::TChunkMeta(const TColumnChunkLoadContext& context, const TIndexInfo&
if (context.GetMetaProto().HasRawBytes()) {
RawBytes = context.GetMetaProto().GetRawBytes();
}
if (context.GetMetaProto().HasMinValue()) {
AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
Min = ConstantToScalar(context.GetMetaProto().GetMinValue(), field->type());
}
if (context.GetMetaProto().HasMaxValue()) {
AFL_VERIFY(field)("field_id", context.GetAddress().GetColumnId())("field_name", indexInfo.GetColumnName(context.GetAddress().GetColumnId()));
Max = ConstantToScalar(context.GetMetaProto().GetMaxValue(), field->type());
Expand All @@ -37,9 +33,9 @@ NKikimrTxColumnShard::TIndexColumnMeta TChunkMeta::SerializeToProto() const {
if (RawBytes) {
meta.SetRawBytes(*RawBytes);
}
if (HasMinMax()) {
ScalarToConstant(*Min, *meta.MutableMinValue());
if (HasMax()) {
ScalarToConstant(*Max, *meta.MutableMaxValue());
ScalarToConstant(*Max, *meta.MutableMinValue());
}
return meta;
}
Expand Down
36 changes: 0 additions & 36 deletions ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,6 @@ void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArr
Meta.SetTierName(tierName);
}

std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
std::shared_ptr<arrow::Scalar> result;
for (auto&& i : Records) {
if (i.ColumnId == columnId) {
if (!i.GetMeta().GetMin()) {
return nullptr;
}
if (!result || NArrow::ScalarCompare(result, i.GetMeta().GetMin()) > 0) {
result = i.GetMeta().GetMin();
}
}
}
return result;
}

std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
std::shared_ptr<arrow::Scalar> result;
for (auto&& i : Records) {
Expand Down Expand Up @@ -130,14 +115,6 @@ ui64 TPortionInfo::GetIndexBytes(const std::set<ui32>& entityIds) const {
return sum;
}

int TPortionInfo::CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
return CompareByColumnIdsImpl<TMaxGetter, TMinGetter>(item, info.KeyColumns);
}

int TPortionInfo::CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const {
return CompareMinByColumnIds(item, info.KeyColumns);
}

TString TPortionInfo::DebugString(const bool withDetails) const {
TStringBuilder sb;
sb << "(portion_id:" << Portion << ";" <<
Expand Down Expand Up @@ -179,19 +156,6 @@ void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& r
}
}

bool TPortionInfo::HasPkMinMax() const {
bool result = false;
for (auto&& i : Records) {
if (i.ColumnId == Meta.FirstPkColumn) {
if (!i.GetMeta().HasMinMax()) {
return false;
}
result = true;
}
}
return result;
}

std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksPointers(const ui32 columnId) const {
std::vector<const TColumnRecord*> result;
for (auto&& c : Records) {
Expand Down
45 changes: 1 addition & 44 deletions ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class TPortionInfo {
TSnapshot MinSnapshot = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion}
TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one)

bool HasPkMinMax() const;
TPortionMeta Meta;
std::shared_ptr<NOlap::IBlobsStorageOperator> BlobsOperator;
ui64 DeprecatedGranuleId = 0;
Expand Down Expand Up @@ -190,7 +189,7 @@ class TPortionInfo {

bool Empty() const { return Records.empty(); }
bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; }
bool Valid() const { return MinSnapshot.Valid() && PathId && Portion && !Empty() && Produced() && HasPkMinMax() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
bool Valid() const { return MinSnapshot.Valid() && PathId && Portion && !Empty() && Produced() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
bool ValidSnapshotInfo() const { return MinSnapshot.Valid() && PathId && Portion; }
bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; }
bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; }
Expand Down Expand Up @@ -338,7 +337,6 @@ class TPortionInfo {
void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys,
const TString& tierName);

std::shared_ptr<arrow::Scalar> MinValue(ui32 columnId) const;
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;

const NArrow::TReplaceKey& IndexKeyStart() const {
Expand Down Expand Up @@ -414,48 +412,7 @@ class TPortionInfo {
return GetRawBytes();
}

private:
class TMinGetter {
public:
static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
return portionInfo.MinValue(columnId);
}
};

class TMaxGetter {
public:
static std::shared_ptr<arrow::Scalar> Get(const TPortionInfo& portionInfo, const ui32 columnId) {
return portionInfo.MaxValue(columnId);
}
};

template <class TSelfGetter, class TItemGetter = TSelfGetter>
int CompareByColumnIdsImpl(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
for (auto&& i : columnIds) {
std::shared_ptr<arrow::Scalar> valueSelf = TSelfGetter::Get(*this, i);
std::shared_ptr<arrow::Scalar> valueItem = TItemGetter::Get(item, i);
if (!!valueSelf && !!valueItem) {
const int cmpResult = NArrow::ScalarCompare(valueSelf, valueItem);
if (cmpResult) {
return cmpResult;
}
} else if (!!valueSelf) {
return 1;
} else if (!!valueItem) {
return -1;
}
}
return 0;
}
public:
int CompareSelfMaxItemMinByPk(const TPortionInfo& item, const TIndexInfo& info) const;

int CompareMinByPk(const TPortionInfo& item, const TIndexInfo& info) const;

int CompareMinByColumnIds(const TPortionInfo& item, const std::vector<ui32>& columnIds) const {
return CompareByColumnIdsImpl<TMinGetter>(item, columnIds);
}

class TAssembleBlobInfo {
private:
ui32 NullRowsCount = 0;
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ namespace NKikimr::NOlap {
class TNormalizationContext {
YDB_ACCESSOR_DEF(TActorId, ResourceSubscribeActor);
YDB_ACCESSOR_DEF(TActorId, ColumnshardActor);
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
public:
void SetResourcesGuard(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> rg) {
ResourcesGuard = rg;
}
};

class TNormalizationController;
Expand Down Expand Up @@ -117,7 +122,7 @@ namespace NKikimr::NOlap {
TString DebugString() const {
return TStringBuilder() << "normalizers_count=" << Normalizers.size()
<< ";current_normalizer_idx=" << CurrentNormalizerIndex
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size()) ? Normalizers[CurrentNormalizerIndex]->GetName() : "";
<< ";current_normalizer=" << (CurrentNormalizerIndex < Normalizers.size() ? Normalizers[CurrentNormalizerIndex]->GetName() : "");
}

const INormalizerComponent::TPtr& GetNormalizer() const;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TRowsAndBytesChangesTask: public NConveyor::ITask {
}

public:
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, THashMap<ui64, ISnapshotSchema::TPtr>&&)
TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>>)
: Blobs(std::move(blobs))
, Chunks(std::move(chunks))
, NormContext(nCtx)
Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
private:
THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
TDataContainer Portions;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
TNormalizationContext NormContext;
protected:
virtual bool DoExecute() override {
Y_ABORT_UNLESS(!Schemas.empty());
auto pkColumnIds = Schemas.begin()->second->GetPkColumnsIds();
Y_ABORT_UNLESS(!Schemas->empty());
auto pkColumnIds = Schemas->begin()->second->GetPkColumnsIds();
pkColumnIds.insert(TIndexInfo::GetSpecialColumnIds().begin(), TIndexInfo::GetSpecialColumnIds().end());

for (auto&& portionInfo : Portions) {
auto blobSchema = Schemas.FindPtr(portionInfo->GetPortionId());
auto blobSchema = Schemas->FindPtr(portionInfo->GetPortionId());
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
for (auto&& i : portionInfo->Records) {
auto blobIt = Blobs.find(i.BlobRange);
Expand All @@ -47,10 +47,10 @@ class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
}

public:
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: Blobs(std::move(blobs))
, Portions(std::move(portions))
, Schemas(std::move(schemas))
, Schemas(schemas)
, NormContext(nCtx)
{}

Expand Down Expand Up @@ -135,7 +135,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
}

THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
auto schemas = std::make_shared<THashMap<ui64, ISnapshotSchema::TPtr>>();
auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());

{
Expand All @@ -161,7 +161,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
auto portionMeta = loadContext.GetPortionMeta();
if (it == portions.end()) {
Y_ABORT_UNLESS(portion.Records.empty());
schemas[portion.GetPortionId()] = currentSchema;
(*schemas)[portion.GetPortionId()] = currentSchema;
auto portionNew = std::make_shared<TPortionInfo>(portion);
portionNew->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
it = portions.emplace(portion.GetPortion(), portionNew).first;
Expand Down Expand Up @@ -202,7 +202,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
}
++brokenPortioncCount;
package.emplace_back(portion.second);
if (package.size() == 100) {
if (package.size() == 1000) {
std::vector<std::shared_ptr<TPortionInfo>> local;
local.swap(package);
tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(local), schemas));
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/tx/columnshard/normalizer/portion/normalizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
private:
using TBase = NOlap::NBlobOperations::NRead::ITask;
typename TConveyorTask::TDataContainer Data;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
TNormalizationContext NormContext;

public:
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: TBase(actions, "CS::NORMALIZER")
, Data(std::move(data))
, Schemas(std::move(schemas))
Expand All @@ -32,8 +32,8 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {

protected:
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
Y_UNUSED(resourcesGuard);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), std::move(Schemas));
NormContext.SetResourcesGuard(resourcesGuard);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), Schemas);
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
}

Expand All @@ -49,13 +49,13 @@ class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
template <class TConveyorTask>
class TPortionsNormalizerTask : public INormalizerTask {
typename TConveyorTask::TDataContainer Package;
THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> Schemas;
public:
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package)
: Package(std::move(package))
{}

TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const THashMap<ui64, ISnapshotSchema::TPtr>& schemas)
TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const std::shared_ptr<THashMap<ui64, ISnapshotSchema::TPtr>> schemas)
: Package(std::move(package))
, Schemas(schemas)
{}
Expand All @@ -71,7 +71,7 @@ class TPortionsNormalizerTask : public INormalizerTask {
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction};
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
std::make_shared<TReadPortionsTask<TConveyorTask>>( nCtx, actions, std::move(Package), std::move(Schemas) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
std::make_shared<TReadPortionsTask<TConveyorTask>>(nCtx, actions, std::move(Package), Schemas), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
}
};
}
6 changes: 2 additions & 4 deletions ydb/core/tx/columnshard/splitter/chunk_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,22 @@

namespace NKikimr::NOlap {

TSimpleChunkMeta::TSimpleChunkMeta(const std::shared_ptr<arrow::Array>& column, const bool needMinMax, const bool isSortedColumn) {
TSimpleChunkMeta::TSimpleChunkMeta(const std::shared_ptr<arrow::Array>& column, const bool needMax, const bool isSortedColumn) {
Y_ABORT_UNLESS(column);
Y_ABORT_UNLESS(column->length());
NumRows = column->length();
RawBytes = NArrow::GetArrayDataSize(column);

if (needMinMax) {
if (needMax) {
std::pair<i32, i32> minMaxPos = {0, (column->length() - 1)};
if (!isSortedColumn) {
minMaxPos = NArrow::FindMinMaxPosition(column);
Y_ABORT_UNLESS(minMaxPos.first >= 0);
Y_ABORT_UNLESS(minMaxPos.second >= 0);
}

Min = NArrow::GetScalar(column, minMaxPos.first);
Max = NArrow::GetScalar(column, minMaxPos.second);

Y_ABORT_UNLESS(Min);
Y_ABORT_UNLESS(Max);
}
}
Expand Down
8 changes: 2 additions & 6 deletions ydb/core/tx/columnshard/splitter/chunk_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace NKikimr::NOlap {

class TSimpleChunkMeta {
protected:
std::shared_ptr<arrow::Scalar> Min;
std::shared_ptr<arrow::Scalar> Max;
std::optional<ui32> NumRows;
std::optional<ui32> RawBytes;
Expand All @@ -25,9 +24,6 @@ class TSimpleChunkMeta {
return sizeof(ui32) + sizeof(ui32) + 8 * 3 * 2;
}

std::shared_ptr<arrow::Scalar> GetMin() const {
return Min;
}
std::shared_ptr<arrow::Scalar> GetMax() const {
return Max;
}
Expand All @@ -49,8 +45,8 @@ class TSimpleChunkMeta {
return *RawBytes;
}

bool HasMinMax() const noexcept {
return Min.get() && Max.get();
bool HasMax() const noexcept {
return Max.get();
}

};
Expand Down