Skip to content

speed up merging with correct pointers operation #7230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions ydb/core/formats/arrow/reader/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

namespace NKikimr::NArrow::NMerger {

void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point) {
void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy) {
AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString());
Y_ABORT_UNLESS(point.IsReverseSort() == Reverse);
Y_ABORT_UNLESS(++ControlPoints == 1);

SortHeap.Push(TBatchIterator(point.BuildRWPosition()));
SortHeap.Push(TBatchIterator(point.BuildRWPosition(false, deepCopy)));
}

void TMergePartialStream::RemoveControlPoint() {
Expand Down Expand Up @@ -65,7 +65,7 @@ bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, cons
}

bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
PutControlPoint(readTo);
PutControlPoint(readTo, false);
return DrainToControlPoint(builder, includeFinish, lastResultPosition);
}

Expand Down Expand Up @@ -191,6 +191,9 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
for (auto&& i : positions) {
TRecordBatchBuilder indexesBuilder(resultFields);
if (SortHeap.Empty() || i.GetPosition().Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::less) {
continue;
}
DrainCurrentTo(indexesBuilder, i.GetPosition(), i.IsIncludedToLeftInterval());
result.emplace_back(indexesBuilder.Finalize());
if (result.back()->num_rows() == 0) {
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/formats/arrow/reader/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class TMergePartialStream {
void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition);

void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish,
std::optional<TCursor>* lastResultPosition = nullptr);

public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
: SortSchema(sortSchema)
Expand All @@ -49,6 +52,7 @@ class TMergePartialStream {
Y_ABORT_UNLESS(!DataSchema || DataSchema->num_fields());
}

void PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy);
void SkipToLowerBound(const TSortableBatchPosition& pos, const bool include);

void SetPossibleSameVersion(const bool value) {
Expand All @@ -67,8 +71,6 @@ class TMergePartialStream {
return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson();
}

void PutControlPoint(const TSortableBatchPosition& point);

void RemoveControlPoint();

bool ControlPointEnriched() const {
Expand All @@ -92,7 +94,6 @@ class TMergePartialStream {

void DrainAll(TRecordBatchBuilder& builder);
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const TIntervalPositions& positions,
const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/formats/arrow/reader/position.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
return FindPosition(position, posStart, posFinish, forFound, greater);
}

NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition() const {
return TRWSortableBatchPosition(
Position, RecordsCount, ReverseSort, Sorting->BuildCopy(Position), Data ? Data->BuildCopy(Position) : nullptr);
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(const bool needData, const bool deepCopy) const {
return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort,
deepCopy ? Sorting->BuildCopy(Position) : Sorting,
(needData && Data) ? (deepCopy ? Data->BuildCopy(Position) : Data) : nullptr);
}

NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(
Expand Down
26 changes: 14 additions & 12 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class TSortableScanData {
}

std::shared_ptr<TSortableScanData> BuildCopy(const ui64 position) const {
return std::make_shared<TSortableScanData>(position, RecordsCount, Columns, Fields);
return std::make_shared<TSortableScanData>(*this);
}

TCursor BuildCursor(const ui64 position) const {
Expand Down Expand Up @@ -209,6 +209,17 @@ class TSortableBatchPosition {
bool ReverseSort = false;
std::shared_ptr<TSortableScanData> Sorting;
std::shared_ptr<TSortableScanData> Data;

TSortableBatchPosition(const i64 position, const i64 recordsCount, const bool reverseSort, const std::shared_ptr<TSortableScanData>& sorting,
const std::shared_ptr<TSortableScanData>& data)
: Position(position)
, RecordsCount(recordsCount)
, ReverseSort(reverseSort)
, Sorting(sorting)
, Data(data) {
AFL_VERIFY(IsAvailablePosition(Position));
}

public:
TSortableBatchPosition() = default;

Expand All @@ -220,7 +231,7 @@ class TSortableBatchPosition {
return RecordsCount;
}

std::shared_ptr<TSortableScanData> GetSorting() const {
const std::shared_ptr<TSortableScanData>& GetSorting() const {
return Sorting;
}

Expand All @@ -239,15 +250,6 @@ class TSortableBatchPosition {
return Sorting->GetFields();
}

TSortableBatchPosition(const i64 position, const i64 recordsCount, const bool reverseSort, const std::shared_ptr<TSortableScanData>& sorting, const std::shared_ptr<TSortableScanData>& data)
: Position(position)
, RecordsCount(recordsCount)
, ReverseSort(reverseSort)
, Sorting(sorting)
, Data(data) {
AFL_VERIFY(IsAvailablePosition(Position));
}

TSortableBatchPosition(const TRWSortableBatchPosition& source) = delete;
TSortableBatchPosition(TRWSortableBatchPosition& source) = delete;
TSortableBatchPosition(TRWSortableBatchPosition&& source) = delete;
Expand All @@ -256,7 +258,7 @@ class TSortableBatchPosition {
TSortableBatchPosition operator= (TRWSortableBatchPosition& source) = delete;
TSortableBatchPosition operator= (TRWSortableBatchPosition&& source) = delete;

TRWSortableBatchPosition BuildRWPosition() const;
TRWSortableBatchPosition BuildRWPosition(const bool needData, const bool deepCopy) const;

std::shared_ptr<arrow::Table> SliceData(const ui64 offset, const ui64 count) const {
AFL_VERIFY(Data);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/reader/result_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ std::shared_ptr<arrow::RecordBatch> TRecordBatchBuilder::Finalize() {
for (auto&& i : Builders) {
columns.emplace_back(NArrow::TStatusValidator::GetValid(i->Finish()));
}
auto result = arrow::RecordBatch::Make(schema, columns.front()->length(), columns);
auto result = arrow::RecordBatch::Make(schema, columns.front()->length(), std::move(columns));
#ifndef NDEBUG
NArrow::TStatusValidator::Validate(result->ValidateFull());
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
return TConclusionStatus::Success();
}
}
Merger->PutControlPoint(MergingContext->GetFinish());
Merger->PutControlPoint(MergingContext->GetFinish(), false);
Merger->SkipToLowerBound(MergingContext->GetStart(), MergingContext->GetIncludeStart());
const ui32 originalSourcesCount = Sources.size();
Sources.clear();
Expand Down
Loading