Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/engines/predicate/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ bool TPKRangesFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInf
return SortedRanges.empty();
}

bool TPKRangesFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const {
for (auto&& i : SortedRanges) {
if (i.IsPortionInPartialUsage(start, end, indexInfo)) {
return true;
}
}
return false;
}

TPKRangesFilter::TPKRangesFilter(const bool reverse)
: ReverseFlag(reverse)
{
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/predicate/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TPKRangesFilter {
}

bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const;

NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;

Expand Down
31 changes: 31 additions & 0 deletions ydb/core/tx/columnshard/engines/predicate/range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,37 @@ bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info, const TIndexInfo
return true;
}

bool TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const {
bool startUsage = false;
bool endUsage = false;
if (auto from = PredicateFrom.ExtractKey(indexInfo.GetPrimaryKey())) {
AFL_VERIFY(from->Size() <= start.Size());
if (PredicateFrom.IsInclude()) {
startUsage = std::is_lt(start.ComparePartNotNull(*from, from->Size()));
} else {
startUsage = std::is_lteq(start.ComparePartNotNull(*from, from->Size()));
}
} else {
startUsage = true;
}

if (auto to = PredicateTo.ExtractKey(indexInfo.GetPrimaryKey())) {
AFL_VERIFY(to->Size() <= end.Size());
if (PredicateTo.IsInclude()) {
endUsage = std::is_gt(end.ComparePartNotNull(*to, to->Size()));
} else {
endUsage = std::is_gteq(end.ComparePartNotNull(*to, to->Size()));
}
} else {
endUsage = true;
}

// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", start.DebugString())("end", end.DebugString())("from", PredicateFrom.DebugString())("to", PredicateTo.DebugString())
// ("start_usage", startUsage)("end_usage", endUsage);

return endUsage || startUsage;
}

std::optional<NKikimr::NOlap::TPKRangeFilter> TPKRangeFilter::Build(TPredicateContainer&& from, TPredicateContainer&& to) {
if (!from.CrossRanges(to)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "cannot_build_predicate_range")("error", "predicates from/to not intersected");
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/predicate/range.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TPKRangeFilter {
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;

bool IsPortionInUsage(const TPortionInfo& info, const TIndexInfo& indexInfo) const;
bool IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end, const TIndexInfo& indexInfo) const;

std::set<ui32> GetColumnIds(const TIndexInfo& indexInfo) const;
TString DebugString() const;
Expand Down
45 changes: 33 additions & 12 deletions ydb/core/tx/columnshard/engines/reader/plain_reader/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ ui64 TSpecialReadContext::GetMemoryForSources(const std::map<ui32, std::shared_p

std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source, const bool exclusiveSource) const {
const bool needSnapshots = !exclusiveSource || ReadMetadata->GetSnapshot() < source->GetRecordSnapshotMax();
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0];
const bool partialUsageByPK = ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey(), ReadMetadata->GetIndexInfo());
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][exclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0];
if (!result) {
return std::make_shared<TBuildFakeSpec>(source->GetRecordsCount(), "fake");
}
return result;
}

std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource) const {
std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt) const {
std::shared_ptr<IFetchingStep> result = std::make_shared<TFakeStep>();
std::shared_ptr<IFetchingStep> current = result;
const bool partialUsageByPredicate = partialUsageByPredicateExt && PredicateColumns->GetColumnsCount();
if (!!IndexChecker) {
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TIndexesSet>(IndexChecker->GetIndexIds())));
current = current->AttachNext(std::make_shared<TApplyIndexStep>(IndexChecker));
}
if (!EFColumns->GetColumnsCount()) {
if (!EFColumns->GetColumnsCount() && !partialUsageByPredicate) {
TColumnsSet columnsFetch = *FFColumns;
if (needSnapshots) {
columnsFetch = columnsFetch + *SpecColumns;
Expand All @@ -52,6 +54,9 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
if (needSnapshots || FFColumns->Contains(SpecColumns)) {
columnsFetch = columnsFetch + *SpecColumns;
}
if (partialUsageByPredicate) {
columnsFetch = columnsFetch + *PredicateColumns;
}
AFL_VERIFY(columnsFetch.GetColumnsCount());
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsFetch), "ef"));

Expand All @@ -60,17 +65,21 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
columnsFetch = columnsFetch - *SpecColumns;
}
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
if (partialUsageByPredicate) {
current = current->AttachNext(std::make_shared<TAssemblerStep>(PredicateColumns));
current = current->AttachNext(std::make_shared<TPredicateFilter>());
columnsFetch = columnsFetch - *PredicateColumns;
}
if (columnsFetch.GetColumnsCount()) {
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsFetch)));
}
for (auto&& i : ReadMetadata->GetProgram().GetSteps()) {
if (!i->IsFilterOnly()) {
break;
}
current = current->AttachNext(std::make_shared<TFilterProgramStep>(i));
}
const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns;
const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PredicateColumns;
if (columnsAdditionalFetch.GetColumnsCount()) {
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
Expand All @@ -84,7 +93,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
current = current->AttachNext(std::make_shared<TSnapshotFilter>());
}
current = current->AttachNext(std::make_shared<TAssemblerStep>(PKColumns));
if (!ReadMetadata->GetPKRangesFilter().IsEmpty()) {
if (partialUsageByPredicate) {
current = current->AttachNext(std::make_shared<TPredicateFilter>());
}
const TColumnsSet columnsFetchEF = columnsFetch - *SpecColumns - *PKColumns;
Expand All @@ -95,7 +104,7 @@ std::shared_ptr<NKikimr::NOlap::NPlainReader::IFetchingStep> TSpecialReadContext
}
current = current->AttachNext(std::make_shared<TFilterProgramStep>(i));
}
const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns;
const TColumnsSet columnsAdditionalFetch = *FFColumns - *EFColumns - *SpecColumns - *PKColumns - *PredicateColumns;
if (columnsAdditionalFetch.GetColumnsCount()) {
current = current->AttachNext(std::make_shared<TBlobsFetchingStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
current = current->AttachNext(std::make_shared<TAssemblerStep>(std::make_shared<TColumnsSet>(columnsAdditionalFetch)));
Expand All @@ -114,6 +123,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
auto readSchema = ReadMetadata->GetLoadSchema(ReadMetadata->GetSnapshot());
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSpecialColumnIdsSet(), ReadMetadata->GetIndexInfo(), readSchema);
IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
{
auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo());
if (predicateColumns.size()) {
PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, ReadMetadata->GetIndexInfo(), readSchema);
} else {
PredicateColumns = std::make_shared<TColumnsSet>();
}
}
{
auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
if (efColumns.size()) {
Expand Down Expand Up @@ -144,10 +161,14 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());
CacheFetchingScripts[0][0] = BuildColumnsFetchingPlan(false, false);
CacheFetchingScripts[0][1] = BuildColumnsFetchingPlan(false, true);
CacheFetchingScripts[1][0] = BuildColumnsFetchingPlan(true, false);
CacheFetchingScripts[1][1] = BuildColumnsFetchingPlan(true, true);
CacheFetchingScripts[0][0][0] = BuildColumnsFetchingPlan(false, false, false);
CacheFetchingScripts[0][1][0] = BuildColumnsFetchingPlan(false, true, false);
CacheFetchingScripts[1][0][0] = BuildColumnsFetchingPlan(true, false, false);
CacheFetchingScripts[1][1][0] = BuildColumnsFetchingPlan(true, true, false);
CacheFetchingScripts[0][0][1] = BuildColumnsFetchingPlan(false, false, true);
CacheFetchingScripts[0][1][1] = BuildColumnsFetchingPlan(false, true, true);
CacheFetchingScripts[1][0][1] = BuildColumnsFetchingPlan(true, false, true);
CacheFetchingScripts[1][1][1] = BuildColumnsFetchingPlan(true, true, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TSpecialReadContext {
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, SpecColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, MergeColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, EFColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PredicateColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, PKColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, FFColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, ProgramInputColumns);
Expand All @@ -25,8 +26,8 @@ class TSpecialReadContext {
std::shared_ptr<TColumnsSet> PKFFColumns;
std::shared_ptr<TColumnsSet> EFPKColumns;
std::shared_ptr<TColumnsSet> FFMinusEFColumns;
std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource) const;
std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2> CacheFetchingScripts;
std::shared_ptr<IFetchingStep> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource, const bool partialUsageByPredicate) const;
std::array<std::array<std::array<std::shared_ptr<IFetchingStep>, 2>, 2>, 2> CacheFetchingScripts;
public:
ui64 GetMemoryForSources(const std::map<ui32, std::shared_ptr<IDataSource>>& sources, const bool isExclusive);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,10 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<NOlap::TReadContext>& conte
} else {
insertedPortionsBytes += (*itPortion)->BlobsBytes();
}
auto start = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyStart());
auto finish = GetReadMetadata()->BuildSortedPosition((*itPortion)->IndexKeyEnd());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", start.DebugJson())("finish", finish.DebugJson());
sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, start, finish));
sources.emplace_back(std::make_shared<TPortionDataSource>(sourceIdx++, *itPortion, SpecialReadContext, (*itPortion)->IndexKeyStart(), (*itPortion)->IndexKeyEnd()));
++itPortion;
} else {
auto start = GetReadMetadata()->BuildSortedPosition(itCommitted->GetFirstVerified());
auto finish = GetReadMetadata()->BuildSortedPosition(itCommitted->GetLastVerified());
sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, start, finish));
sources.emplace_back(std::make_shared<TCommittedDataSource>(sourceIdx++, *itCommitted, SpecialReadContext, itCommitted->GetFirstVerified(), itCommitted->GetLastVerified()));
committedPortionsBytes += itCommitted->GetSize();
++itCommitted;
}
Expand Down
26 changes: 19 additions & 7 deletions ydb/core/tx/columnshard/engines/reader/plain_reader/source.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class IDataSource {
YDB_READONLY(ui32, SourceIdx, 0);
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Start);
YDB_READONLY_DEF(NIndexedReader::TSortableBatchPosition, Finish);
NArrow::TReplaceKey StartReplaceKey;
NArrow::TReplaceKey FinishReplaceKey;
YDB_READONLY_DEF(std::shared_ptr<TSpecialReadContext>, Context);
YDB_READONLY(TSnapshot, RecordSnapshotMax, TSnapshot::Zero());
std::optional<ui32> RecordsCount;
Expand All @@ -52,6 +54,13 @@ class IDataSource {
virtual void DoAbort() = 0;
virtual void DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexMeta) = 0;
public:
const NArrow::TReplaceKey& GetStartReplaceKey() const {
return StartReplaceKey;
}
const NArrow::TReplaceKey& GetFinishReplaceKey() const {
return FinishReplaceKey;
}

const TFetchedResult& GetStageResult() const {
AFL_VERIFY(!!StageResult);
return *StageResult;
Expand Down Expand Up @@ -147,16 +156,19 @@ class IDataSource {
void RegisterInterval(TFetchingInterval& interval);

IDataSource(const ui32 sourceIdx, const std::shared_ptr<TSpecialReadContext>& context,
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish,
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish,
const TSnapshot& recordSnapshotMax, const std::optional<ui32> recordsCount
)
: SourceIdx(sourceIdx)
, Start(start)
, Finish(finish)
, Start(context->GetReadMetadata()->BuildSortedPosition(start))
, Finish(context->GetReadMetadata()->BuildSortedPosition(finish))
, StartReplaceKey(start)
, FinishReplaceKey(finish)
, Context(context)
, RecordSnapshotMax(recordSnapshotMax)
, RecordsCount(recordsCount)
{
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugJson())("finish", Finish.DebugJson());
if (Start.IsReverseSort()) {
std::swap(Start, Finish);
}
Expand Down Expand Up @@ -210,10 +222,10 @@ class TPortionDataSource: public IDataSource {
}

TPortionDataSource(const ui32 sourceIdx, const std::shared_ptr<TPortionInfo>& portion, const std::shared_ptr<TSpecialReadContext>& context,
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish)
: TBase(sourceIdx, context, start, finish, portion->RecordSnapshotMax(), portion->GetRecordsCount())
, Portion(portion) {

, Portion(portion)
{
}
};

Expand Down Expand Up @@ -256,7 +268,7 @@ class TCommittedDataSource: public IDataSource {
}

TCommittedDataSource(const ui32 sourceIdx, const TCommittedBlob& committed, const std::shared_ptr<TSpecialReadContext>& context,
const NIndexedReader::TSortableBatchPosition& start, const NIndexedReader::TSortableBatchPosition& finish)
const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish)
: TBase(sourceIdx, context, start, finish, committed.GetSnapshot(), {})
, CommittedBlob(committed) {

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto

std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
std::set<ui32> result = GetPKRangesFilter().GetColumnIds(indexInfo);
std::set<ui32> result;
for (auto&& i : GetProgram().GetEarlyFilterColumns()) {
auto id = indexInfo.GetColumnIdOptional(i);
if (id) {
Expand Down