Skip to content

Commit 1c9fffe

Browse files
Merge 8f5207d into 6569863
2 parents 6569863 + 8f5207d commit 1c9fffe

File tree

16 files changed

+544
-208
lines changed

16 files changed

+544
-208
lines changed

ydb/core/formats/arrow/common/accessor.cpp

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,27 @@ std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 reco
1717
return NArrow::CopyRecords(address.GetArray(), {address.GetPosition()});
1818
}
1919

20-
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::TReader::Slice(const ui32 offset, const ui32 count) const {
20+
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
2121
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
2222
ui32 currentOffset = offset;
2323
ui32 countLeast = count;
2424
std::vector<std::shared_ptr<arrow::Array>> chunks;
25+
auto address = GetChunk({}, offset);
2526
while (countLeast) {
26-
auto address = GetReadChunk(currentOffset);
27-
if (address.GetPosition() + countLeast <= (ui64)address.GetArray()->length()) {
28-
chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), countLeast));
27+
address = GetChunk(address, currentOffset);
28+
const ui64 internalPos = currentOffset - address.GetStartPosition();
29+
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
30+
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
2931
break;
3032
} else {
31-
const ui32 deltaCount = address.GetArray()->length() - address.GetPosition();
32-
chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), deltaCount));
33+
const ui32 deltaCount = address.GetArray()->length() - internalPos;
34+
chunks.emplace_back(address.GetArray()->Slice(internalPos, deltaCount));
3335
AFL_VERIFY(countLeast >= deltaCount);
3436
countLeast -= deltaCount;
3537
currentOffset += deltaCount;
3638
}
3739
}
38-
return std::make_shared<arrow::ChunkedArray>(chunks, ChunkedArray->DataType);
40+
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
3941
}
4042

4143
TString IChunkedArray::TReader::DebugString(const ui32 position) const {
@@ -89,6 +91,27 @@ class TChunkAccessor {
8991
return ChunkedArray->chunk(idx);
9092
}
9193
};
94+
95+
}
96+
97+
std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
98+
AFL_VERIFY(StartPosition <= position);
99+
AFL_VERIFY(position < FinishPosition);
100+
AFL_VERIFY(item.StartPosition <= itemPosition);
101+
AFL_VERIFY(itemPosition < item.FinishPosition);
102+
return TComparator::TypedCompare<true>(*Array, position - StartPosition, *item.Array, itemPosition - item.StartPosition);
103+
}
104+
105+
std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const {
106+
AFL_VERIFY(StartPosition <= recordIndex);
107+
AFL_VERIFY(recordIndex < FinishPosition);
108+
return NArrow::CopyRecords(Array, { recordIndex - StartPosition });
109+
}
110+
111+
TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const {
112+
AFL_VERIFY(position < FinishPosition);
113+
AFL_VERIFY(StartPosition <= position);
114+
return NArrow::DebugString(Array, position - StartPosition);
92115
}
93116

94117
IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {

ydb/core/formats/arrow/common/accessor.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,31 @@ class IChunkedArray {
2121
private:
2222
YDB_READONLY_DEF(std::shared_ptr<arrow::Array>, Array);
2323
YDB_READONLY(ui64, StartPosition, 0);
24+
YDB_READONLY(ui64, FinishPosition, 0);
2425
YDB_READONLY(ui64, ChunkIndex, 0);
2526
public:
27+
TString DebugString(const ui64 position) const;
28+
2629
ui64 GetLength() const {
2730
return Array->length();
2831
}
2932

33+
bool Contains(const ui64 position) const {
34+
return position >= StartPosition && position < FinishPosition;
35+
}
36+
37+
std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;
38+
39+
std::partial_ordering Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const;
40+
3041
TCurrentChunkAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 pos, const ui32 chunkIdx)
3142
: Array(arr)
3243
, StartPosition(pos)
3344
, ChunkIndex(chunkIdx)
3445
{
3546
AFL_VERIFY(arr);
3647
AFL_VERIFY(arr->length());
48+
FinishPosition = StartPosition + arr->length();
3749
}
3850

3951
TString DebugString() const {
@@ -141,7 +153,6 @@ class IChunkedArray {
141153
static std::partial_ordering CompareColumns(const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition);
142154
void AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const;
143155
std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;
144-
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
145156
TString DebugString(const ui32 position) const;
146157
};
147158

@@ -150,6 +161,12 @@ class IChunkedArray {
150161
}
151162
virtual ~IChunkedArray() = default;
152163

164+
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
165+
166+
TCurrentChunkAddress GetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
167+
return DoGetChunk(chunkCurrent, position);
168+
}
169+
153170
IChunkedArray(const ui64 recordsCount, const EType type, const std::shared_ptr<arrow::DataType>& dataType)
154171
: DataType(dataType)
155172
, RecordsCount(recordsCount)

ydb/core/formats/arrow/reader/batch_iterator.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ namespace NKikimr::NArrow::NMerger {
77
class TBatchIterator {
88
private:
99
bool ControlPointFlag;
10-
TSortableBatchPosition KeyColumns;
11-
TSortableBatchPosition VersionColumns;
10+
TRWSortableBatchPosition KeyColumns;
11+
TRWSortableBatchPosition VersionColumns;
1212
i64 RecordsCount;
1313
int ReverseSortKff;
1414

@@ -34,17 +34,17 @@ class TBatchIterator {
3434
return ControlPointFlag;
3535
}
3636

37-
const TSortableBatchPosition& GetKeyColumns() const {
37+
const TRWSortableBatchPosition& GetKeyColumns() const {
3838
return KeyColumns;
3939
}
4040

41-
const TSortableBatchPosition& GetVersionColumns() const {
41+
const TRWSortableBatchPosition& GetVersionColumns() const {
4242
return VersionColumns;
4343
}
4444

45-
TBatchIterator(const TSortableBatchPosition& keyColumns)
45+
TBatchIterator(TRWSortableBatchPosition&& keyColumns)
4646
: ControlPointFlag(true)
47-
, KeyColumns(keyColumns) {
47+
, KeyColumns(std::move(keyColumns)) {
4848

4949
}
5050

ydb/core/formats/arrow/reader/heap.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TSortingHeap {
4747
}
4848

4949
void Push(TSortCursor&& cursor) {
50-
Queue.emplace_back(cursor);
50+
Queue.emplace_back(std::move(cursor));
5151
std::push_heap(Queue.begin(), Queue.end());
5252
NextIdx = 0;
5353
}

ydb/core/formats/arrow/reader/merger.cpp

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44

55
namespace NKikimr::NArrow::NMerger {
66

7-
void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
8-
Y_ABORT_UNLESS(point);
9-
AFL_VERIFY(point->IsSameSortingSchema(SortSchema))("point", point->DebugJson())("schema", SortSchema->ToString());
10-
Y_ABORT_UNLESS(point->IsReverseSort() == Reverse);
7+
void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point) {
8+
AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString());
9+
Y_ABORT_UNLESS(point.IsReverseSort() == Reverse);
1110
Y_ABORT_UNLESS(++ControlPoints == 1);
1211

13-
SortHeap.Push(TBatchIterator(*point));
12+
SortHeap.Push(TBatchIterator(point.BuildRWPosition()));
1413
}
1514

1615
void TMergePartialStream::RemoveControlPoint() {
@@ -21,54 +20,56 @@ void TMergePartialStream::RemoveControlPoint() {
2120
SortHeap.RemoveTop();
2221
}
2322

24-
void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) {
23+
void TMergePartialStream::CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition) {
2524
#ifndef NDEBUG
2625
if (CurrentKeyColumns) {
27-
const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextKeyColumnsPosition) == std::partial_ordering::less;
26+
const bool linearExecutionCorrectness = nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::greater;
2827
if (!linearExecutionCorrectness) {
2928
const bool newSegmentScan = nextKeyColumnsPosition.GetPosition() == 0;
3029
AFL_VERIFY(newSegmentScan && nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::less)
3130
("merge_debug", DebugJson())("current_ext", nextKeyColumnsPosition.DebugJson())("newSegmentScan", newSegmentScan);
3231
}
3332
}
34-
CurrentKeyColumns = nextKeyColumnsPosition;
33+
CurrentKeyColumns = nextKeyColumnsPosition.BuildSortingCursor();
3534
#else
3635
Y_UNUSED(nextKeyColumnsPosition);
3736
#endif
3837
}
3938

40-
bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
39+
bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
4140
AFL_VERIFY(ControlPoints == 1);
4241
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
4342
builder.ValidateDataSchema(DataSchema);
4443
bool cpReachedFlag = false;
44+
std::shared_ptr<TSortableScanData> resultScanData;
45+
ui64 resultPosition;
4546
while (SortHeap.Size() && !cpReachedFlag && !builder.IsBufferExhausted()) {
4647
if (SortHeap.Current().IsControlPoint()) {
47-
auto keyColumns = SortHeap.Current().GetKeyColumns();
48+
auto keyColumns = SortHeap.Current().GetKeyColumns().BuildSortingCursor();
4849
RemoveControlPoint();
4950
cpReachedFlag = true;
5051
if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(keyColumns) == std::partial_ordering::greater) {
52+
if (lastResultPosition && resultScanData) {
53+
*lastResultPosition = resultScanData->BuildCursor(resultPosition);
54+
}
5155
return true;
5256
}
5357
}
5458

55-
if (auto currentPosition = DrainCurrentPosition()) {
56-
CheckSequenceInDebug(*currentPosition);
57-
builder.AddRecord(*currentPosition);
58-
if (lastResultPosition) {
59-
*lastResultPosition = *currentPosition;
60-
}
61-
}
59+
DrainCurrentPosition(&builder, &resultScanData, &resultPosition);
60+
}
61+
if (lastResultPosition && resultScanData) {
62+
*lastResultPosition = resultScanData->BuildCursor(resultPosition);
6263
}
6364
return cpReachedFlag;
6465
}
6566

66-
bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
67-
PutControlPoint(std::make_shared<TSortableBatchPosition>(readTo));
67+
bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
68+
PutControlPoint(readTo);
6869
return DrainToControlPoint(builder, includeFinish, lastResultPosition);
6970
}
7071

71-
std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
72+
std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
7273
std::shared_ptr<arrow::Table> result;
7374
if (SortHeap.Empty()) {
7475
return result;
@@ -100,7 +101,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
100101
result = SortHeap.Current().GetKeyColumns().SliceData(pos.GetPosition() + (include ? 0 : 1), resultSize);
101102
if (lastResultPosition && resultSize) {
102103
auto keys = SortHeap.Current().GetKeyColumns().SliceKeys(pos.GetPosition() + (include ? 0 : 1), resultSize);
103-
*lastResultPosition = TSortableBatchPosition(keys, 0, SortSchema->field_names(), {}, true);
104+
*lastResultPosition = TCursor(keys, 0, SortSchema->field_names());
104105
}
105106
if (SortHeap.Current().GetFilter()) {
106107
SortHeap.Current().GetFilter()->Apply(result, pos.GetPosition() + (include ? 0 : 1), resultSize);
@@ -109,7 +110,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
109110
result = SortHeap.Current().GetKeyColumns().SliceData(startPos, resultSize);
110111
if (lastResultPosition && resultSize) {
111112
auto keys = SortHeap.Current().GetKeyColumns().SliceKeys(startPos, resultSize);
112-
*lastResultPosition = TSortableBatchPosition(keys, keys->num_rows() - 1, SortSchema->field_names(), {}, false);
113+
*lastResultPosition = TCursor(keys, keys->num_rows() - 1, SortSchema->field_names());
113114
}
114115
if (SortHeap.Current().GetFilter()) {
115116
SortHeap.Current().GetFilter()->Apply(result, startPos, resultSize);
@@ -144,38 +145,43 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
144145
void TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
145146
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
146147
while (SortHeap.Size()) {
147-
if (auto currentPosition = DrainCurrentPosition()) {
148-
CheckSequenceInDebug(*currentPosition);
149-
builder.AddRecord(*currentPosition);
150-
}
148+
DrainCurrentPosition(&builder, nullptr, nullptr);
151149
}
152150
}
153151

154-
std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() {
152+
void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition) {
155153
Y_ABORT_UNLESS(SortHeap.Size());
156154
Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
157-
TSortableBatchPosition result = SortHeap.Current().GetKeyColumns();
158-
TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns();
155+
if (!SortHeap.Current().IsDeleted()) {
156+
if (builder) {
157+
builder->AddRecord(SortHeap.Current().GetKeyColumns());
158+
}
159+
if (resultScanData && resultPosition) {
160+
*resultScanData = SortHeap.Current().GetKeyColumns().GetSorting();
161+
*resultPosition = SortHeap.Current().GetKeyColumns().GetPosition();
162+
}
163+
}
164+
CheckSequenceInDebug(SortHeap.Current().GetKeyColumns());
165+
const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition();
166+
std::shared_ptr<TSortableScanData> startSorting = SortHeap.Current().GetKeyColumns().GetSorting();
167+
std::shared_ptr<TSortableScanData> startVersion = SortHeap.Current().GetVersionColumns().GetSorting();
159168
bool isFirst = true;
160-
const bool deletedFlag = SortHeap.Current().IsDeleted();
161-
while (SortHeap.Size() && (isFirst || result.Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::equivalent)) {
162-
auto& anotherIterator = SortHeap.Current();
169+
while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) {
163170
if (!isFirst) {
171+
auto& anotherIterator = SortHeap.Current();
164172
if (PossibleSameVersionFlag) {
165-
AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) != std::partial_ordering::less)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
166-
("key", result.DebugJson());
173+
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater)
174+
("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
175+
("key", startSorting->BuildCursor(startPosition).DebugJson());
167176
} else {
168-
AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
169-
("key", result.DebugJson());
177+
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) == std::partial_ordering::less)
178+
("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
179+
("key", startSorting->BuildCursor(startPosition).DebugJson());
170180
}
171181
}
172182
SortHeap.Next();
173183
isFirst = false;
174184
}
175-
if (deletedFlag) {
176-
return {};
177-
}
178-
return result;
179185
}
180186

181187
std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,

ydb/core/formats/arrow/reader/merger.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace NKikimr::NArrow::NMerger {
1111
class TMergePartialStream {
1212
private:
1313
#ifndef NDEBUG
14-
std::optional<TSortableBatchPosition> CurrentKeyColumns;
14+
std::optional<TCursor> CurrentKeyColumns;
1515
#endif
1616
bool PossibleSameVersionFlag = true;
1717

@@ -34,9 +34,9 @@ class TMergePartialStream {
3434
return result;
3535
}
3636

37-
std::optional<TSortableBatchPosition> DrainCurrentPosition();
37+
void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition);
3838

39-
void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition);
39+
void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition);
4040
public:
4141
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
4242
: SortSchema(sortSchema)
@@ -67,7 +67,7 @@ class TMergePartialStream {
6767
return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson();
6868
}
6969

70-
void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point);
70+
void PutControlPoint(const TSortableBatchPosition& point);
7171

7272
void RemoveControlPoint();
7373

@@ -93,9 +93,9 @@ class TMergePartialStream {
9393
}
9494

9595
void DrainAll(TRecordBatchBuilder& builder);
96-
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
97-
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
98-
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
96+
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
97+
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
98+
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
9999
std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
100100
const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
101101
};

0 commit comments

Comments
 (0)