Skip to content

Commit db0989e

Browse files
speed up records merge
1 parent 0342dc0 commit db0989e

File tree

16 files changed

+545
-208
lines changed

16 files changed

+545
-208
lines changed

ydb/core/formats/arrow/common/accessor.cpp

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,27 @@ std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 reco
1717
return NArrow::CopyRecords(address.GetArray(), {address.GetPosition()});
1818
}
1919

20-
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::TReader::Slice(const ui32 offset, const ui32 count) const {
20+
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
2121
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
2222
ui32 currentOffset = offset;
2323
ui32 countLeast = count;
2424
std::vector<std::shared_ptr<arrow::Array>> chunks;
25+
auto address = GetChunk({}, offset);
2526
while (countLeast) {
26-
auto address = GetReadChunk(currentOffset);
27-
if (address.GetPosition() + countLeast <= (ui64)address.GetArray()->length()) {
28-
chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), countLeast));
27+
address = GetChunk(address, currentOffset);
28+
const ui64 internalPos = currentOffset - address.GetStartPosition();
29+
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
30+
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
2931
break;
3032
} else {
31-
const ui32 deltaCount = address.GetArray()->length() - address.GetPosition();
32-
chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), deltaCount));
33+
const ui32 deltaCount = address.GetArray()->length() - internalPos;
34+
chunks.emplace_back(address.GetArray()->Slice(internalPos, deltaCount));
3335
AFL_VERIFY(countLeast >= deltaCount);
3436
countLeast -= deltaCount;
3537
currentOffset += deltaCount;
3638
}
3739
}
38-
return std::make_shared<arrow::ChunkedArray>(chunks, ChunkedArray->DataType);
40+
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
3941
}
4042

4143
TString IChunkedArray::TReader::DebugString(const ui32 position) const {
@@ -89,6 +91,27 @@ class TChunkAccessor {
8991
return ChunkedArray->chunk(idx);
9092
}
9193
};
94+
95+
}
96+
97+
std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
98+
AFL_VERIFY(StartPosition <= position);
99+
AFL_VERIFY(position < FinishPosition);
100+
AFL_VERIFY(item.StartPosition <= itemPosition);
101+
AFL_VERIFY(itemPosition < item.FinishPosition);
102+
return TComparator::TypedCompare<true>(*Array, position - StartPosition, *item.Array, itemPosition - item.StartPosition);
103+
}
104+
105+
std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const {
106+
AFL_VERIFY(StartPosition <= recordIndex);
107+
AFL_VERIFY(recordIndex < FinishPosition);
108+
return NArrow::CopyRecords(Array, { recordIndex - StartPosition });
109+
}
110+
111+
TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const {
112+
AFL_VERIFY(position < FinishPosition);
113+
AFL_VERIFY(StartPosition <= position);
114+
return NArrow::DebugString(Array, position - StartPosition);
92115
}
93116

94117
IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {

ydb/core/formats/arrow/common/accessor.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,31 @@ class IChunkedArray {
2121
private:
2222
YDB_READONLY_DEF(std::shared_ptr<arrow::Array>, Array);
2323
YDB_READONLY(ui64, StartPosition, 0);
24+
YDB_READONLY(ui64, FinishPosition, 0);
2425
YDB_READONLY(ui64, ChunkIndex, 0);
2526
public:
27+
TString DebugString(const ui64 position) const;
28+
2629
ui64 GetLength() const {
2730
return Array->length();
2831
}
2932

33+
bool Contains(const ui64 position) const {
34+
return position >= StartPosition && position < FinishPosition;
35+
}
36+
37+
std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;
38+
39+
std::partial_ordering Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const;
40+
3041
TCurrentChunkAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 pos, const ui32 chunkIdx)
3142
: Array(arr)
3243
, StartPosition(pos)
3344
, ChunkIndex(chunkIdx)
3445
{
3546
AFL_VERIFY(arr);
3647
AFL_VERIFY(arr->length());
48+
FinishPosition = StartPosition + arr->length();
3749
}
3850

3951
TString DebugString() const {
@@ -141,7 +153,6 @@ class IChunkedArray {
141153
static std::partial_ordering CompareColumns(const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition);
142154
void AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const;
143155
std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;
144-
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
145156
TString DebugString(const ui32 position) const;
146157
};
147158

@@ -150,6 +161,12 @@ class IChunkedArray {
150161
}
151162
virtual ~IChunkedArray() = default;
152163

164+
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
165+
166+
TCurrentChunkAddress GetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
167+
return DoGetChunk(chunkCurrent, position);
168+
}
169+
153170
IChunkedArray(const ui64 recordsCount, const EType type, const std::shared_ptr<arrow::DataType>& dataType)
154171
: DataType(dataType)
155172
, RecordsCount(recordsCount)

ydb/core/formats/arrow/reader/batch_iterator.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ namespace NKikimr::NArrow::NMerger {
77
class TBatchIterator {
88
private:
99
bool ControlPointFlag;
10-
TSortableBatchPosition KeyColumns;
11-
TSortableBatchPosition VersionColumns;
10+
TRWSortableBatchPosition KeyColumns;
11+
TRWSortableBatchPosition VersionColumns;
1212
i64 RecordsCount;
1313
int ReverseSortKff;
1414

@@ -34,17 +34,17 @@ class TBatchIterator {
3434
return ControlPointFlag;
3535
}
3636

37-
const TSortableBatchPosition& GetKeyColumns() const {
37+
const TRWSortableBatchPosition& GetKeyColumns() const {
3838
return KeyColumns;
3939
}
4040

41-
const TSortableBatchPosition& GetVersionColumns() const {
41+
const TRWSortableBatchPosition& GetVersionColumns() const {
4242
return VersionColumns;
4343
}
4444

45-
TBatchIterator(const TSortableBatchPosition& keyColumns)
45+
TBatchIterator(TRWSortableBatchPosition&& keyColumns)
4646
: ControlPointFlag(true)
47-
, KeyColumns(keyColumns) {
47+
, KeyColumns(std::move(keyColumns)) {
4848

4949
}
5050

ydb/core/formats/arrow/reader/heap.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TSortingHeap {
4747
}
4848

4949
void Push(TSortCursor&& cursor) {
50-
Queue.emplace_back(cursor);
50+
Queue.emplace_back(std::move(cursor));
5151
std::push_heap(Queue.begin(), Queue.end());
5252
NextIdx = 0;
5353
}

ydb/core/formats/arrow/reader/merger.cpp

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44

55
namespace NKikimr::NArrow::NMerger {
66

7-
void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
8-
Y_ABORT_UNLESS(point);
9-
AFL_VERIFY(point->IsSameSortingSchema(SortSchema))("point", point->DebugJson())("schema", SortSchema->ToString());
10-
Y_ABORT_UNLESS(point->IsReverseSort() == Reverse);
7+
void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point) {
8+
AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString());
9+
Y_ABORT_UNLESS(point.IsReverseSort() == Reverse);
1110
Y_ABORT_UNLESS(++ControlPoints == 1);
1211

13-
SortHeap.Push(TBatchIterator(*point));
12+
SortHeap.Push(TBatchIterator(point.BuildRWPosition()));
1413
}
1514

1615
void TMergePartialStream::RemoveControlPoint() {
@@ -21,14 +20,15 @@ void TMergePartialStream::RemoveControlPoint() {
2120
SortHeap.RemoveTop();
2221
}
2322

24-
void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) {
23+
void TMergePartialStream::CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition) {
2524
#ifndef NDEBUG
25+
auto nextCursor = nextKeyColumnsPosition.BuildSortingCursor();
2626
if (CurrentKeyColumns) {
27-
const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextKeyColumnsPosition) == std::partial_ordering::less;
27+
const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextCursor) == std::partial_ordering::less;
2828
if (!linearExecutionCorrectness) {
29-
const bool newSegmentScan = nextKeyColumnsPosition.GetPosition() == 0;
30-
AFL_VERIFY(newSegmentScan && nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::less)
31-
("merge_debug", DebugJson())("current_ext", nextKeyColumnsPosition.DebugJson())("newSegmentScan", newSegmentScan);
29+
const bool newSegmentScan = nextCursor.GetPosition() == 0;
30+
AFL_VERIFY(newSegmentScan && nextCursor.Compare(*CurrentKeyColumns) == std::partial_ordering::less)
31+
("merge_debug", DebugJson())("current_ext", nextCursor.DebugJson())("newSegmentScan", newSegmentScan);
3232
}
3333
}
3434
CurrentKeyColumns = nextKeyColumnsPosition;
@@ -37,38 +37,40 @@ void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nex
3737
#endif
3838
}
3939

40-
bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
40+
bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
4141
AFL_VERIFY(ControlPoints == 1);
4242
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
4343
builder.ValidateDataSchema(DataSchema);
4444
bool cpReachedFlag = false;
45+
std::shared_ptr<TSortableScanData> resultScanData;
46+
ui64 resultPosition;
4547
while (SortHeap.Size() && !cpReachedFlag && !builder.IsBufferExhausted()) {
4648
if (SortHeap.Current().IsControlPoint()) {
47-
auto keyColumns = SortHeap.Current().GetKeyColumns();
49+
auto keyColumns = SortHeap.Current().GetKeyColumns().BuildSortingCursor();
4850
RemoveControlPoint();
4951
cpReachedFlag = true;
5052
if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(keyColumns) == std::partial_ordering::greater) {
53+
if (lastResultPosition && resultScanData) {
54+
*lastResultPosition = resultScanData->BuildCursor(resultPosition);
55+
}
5156
return true;
5257
}
5358
}
5459

55-
if (auto currentPosition = DrainCurrentPosition()) {
56-
CheckSequenceInDebug(*currentPosition);
57-
builder.AddRecord(*currentPosition);
58-
if (lastResultPosition) {
59-
*lastResultPosition = *currentPosition;
60-
}
61-
}
60+
DrainCurrentPosition(&builder, &resultScanData, &resultPosition);
61+
}
62+
if (lastResultPosition && resultScanData) {
63+
*lastResultPosition = resultScanData->BuildCursor(resultPosition);
6264
}
6365
return cpReachedFlag;
6466
}
6567

66-
bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
67-
PutControlPoint(std::make_shared<TSortableBatchPosition>(readTo));
68+
bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
69+
PutControlPoint(readTo);
6870
return DrainToControlPoint(builder, includeFinish, lastResultPosition);
6971
}
7072

71-
std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
73+
std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
7274
std::shared_ptr<arrow::Table> result;
7375
if (SortHeap.Empty()) {
7476
return result;
@@ -100,7 +102,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
100102
result = SortHeap.Current().GetKeyColumns().SliceData(pos.GetPosition() + (include ? 0 : 1), resultSize);
101103
if (lastResultPosition && resultSize) {
102104
auto keys = SortHeap.Current().GetKeyColumns().SliceKeys(pos.GetPosition() + (include ? 0 : 1), resultSize);
103-
*lastResultPosition = TSortableBatchPosition(keys, 0, SortSchema->field_names(), {}, true);
105+
*lastResultPosition = TCursor(keys, 0, SortSchema->field_names());
104106
}
105107
if (SortHeap.Current().GetFilter()) {
106108
SortHeap.Current().GetFilter()->Apply(result, pos.GetPosition() + (include ? 0 : 1), resultSize);
@@ -109,7 +111,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
109111
result = SortHeap.Current().GetKeyColumns().SliceData(startPos, resultSize);
110112
if (lastResultPosition && resultSize) {
111113
auto keys = SortHeap.Current().GetKeyColumns().SliceKeys(startPos, resultSize);
112-
*lastResultPosition = TSortableBatchPosition(keys, keys->num_rows() - 1, SortSchema->field_names(), {}, false);
114+
*lastResultPosition = TCursor(keys, keys->num_rows() - 1, SortSchema->field_names());
113115
}
114116
if (SortHeap.Current().GetFilter()) {
115117
SortHeap.Current().GetFilter()->Apply(result, startPos, resultSize);
@@ -144,38 +146,43 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
144146
void TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
145147
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
146148
while (SortHeap.Size()) {
147-
if (auto currentPosition = DrainCurrentPosition()) {
148-
CheckSequenceInDebug(*currentPosition);
149-
builder.AddRecord(*currentPosition);
150-
}
149+
DrainCurrentPosition(&builder, nullptr, nullptr);
151150
}
152151
}
153152

154-
std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() {
153+
void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition) {
155154
Y_ABORT_UNLESS(SortHeap.Size());
156155
Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
157-
TSortableBatchPosition result = SortHeap.Current().GetKeyColumns();
158-
TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns();
156+
if (!SortHeap.Current().IsDeleted()) {
157+
if (builder) {
158+
builder->AddRecord(SortHeap.Current().GetKeyColumns());
159+
}
160+
if (resultScanData && resultPosition) {
161+
*resultScanData = SortHeap.Current().GetKeyColumns().GetSorting();
162+
*resultPosition = SortHeap.Current().GetKeyColumns().GetPosition();
163+
}
164+
}
165+
CheckSequenceInDebug(SortHeap.Current().GetKeyColumns());
166+
const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition();
167+
std::shared_ptr<TSortableScanData> startSorting = SortHeap.Current().GetKeyColumns().GetSorting();
168+
std::shared_ptr<TSortableScanData> startVersion = SortHeap.Current().GetVersionColumns().GetSorting();
159169
bool isFirst = true;
160-
const bool deletedFlag = SortHeap.Current().IsDeleted();
161-
while (SortHeap.Size() && (isFirst || result.Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::equivalent)) {
162-
auto& anotherIterator = SortHeap.Current();
170+
while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) {
163171
if (!isFirst) {
172+
auto& anotherIterator = SortHeap.Current();
164173
if (PossibleSameVersionFlag) {
165-
AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) != std::partial_ordering::less)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
166-
("key", result.DebugJson());
174+
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater)
175+
("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
176+
("key", startSorting->BuildCursor(startPosition).DebugJson());
167177
} else {
168-
AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
169-
("key", result.DebugJson());
178+
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) == std::partial_ordering::less)
179+
("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
180+
("key", startSorting->BuildCursor(startPosition).DebugJson());
170181
}
171182
}
172183
SortHeap.Next();
173184
isFirst = false;
174185
}
175-
if (deletedFlag) {
176-
return {};
177-
}
178-
return result;
179186
}
180187

181188
std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,

ydb/core/formats/arrow/reader/merger.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace NKikimr::NArrow::NMerger {
1111
class TMergePartialStream {
1212
private:
1313
#ifndef NDEBUG
14-
std::optional<TSortableBatchPosition> CurrentKeyColumns;
14+
std::optional<TCursor> CurrentKeyColumns;
1515
#endif
1616
bool PossibleSameVersionFlag = true;
1717

@@ -34,9 +34,9 @@ class TMergePartialStream {
3434
return result;
3535
}
3636

37-
std::optional<TSortableBatchPosition> DrainCurrentPosition();
37+
void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition);
3838

39-
void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition);
39+
void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition);
4040
public:
4141
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
4242
: SortSchema(sortSchema)
@@ -67,7 +67,7 @@ class TMergePartialStream {
6767
return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson();
6868
}
6969

70-
void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point);
70+
void PutControlPoint(const TSortableBatchPosition& point);
7171

7272
void RemoveControlPoint();
7373

@@ -93,9 +93,9 @@ class TMergePartialStream {
9393
}
9494

9595
void DrainAll(TRecordBatchBuilder& builder);
96-
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
97-
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
98-
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
96+
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
97+
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
98+
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
9999
std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
100100
const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
101101
};

0 commit comments

Comments
 (0)