44
55namespace NKikimr ::NArrow::NMerger {
66
7- void TMergePartialStream::PutControlPoint (std::shared_ptr<TSortableBatchPosition> point) {
8- Y_ABORT_UNLESS (point);
9- AFL_VERIFY (point->IsSameSortingSchema (SortSchema))(" point" , point->DebugJson ())(" schema" , SortSchema->ToString ());
10- Y_ABORT_UNLESS (point->IsReverseSort () == Reverse);
7+ void TMergePartialStream::PutControlPoint (const TSortableBatchPosition& point) {
8+ AFL_VERIFY (point.IsSameSortingSchema (SortSchema))(" point" , point.DebugJson ())(" schema" , SortSchema->ToString ());
9+ Y_ABORT_UNLESS (point.IsReverseSort () == Reverse);
1110 Y_ABORT_UNLESS (++ControlPoints == 1 );
1211
13- SortHeap.Push (TBatchIterator (* point));
12+ SortHeap.Push (TBatchIterator (point. BuildRWPosition () ));
1413}
1514
1615void TMergePartialStream::RemoveControlPoint () {
@@ -21,54 +20,56 @@ void TMergePartialStream::RemoveControlPoint() {
2120 SortHeap.RemoveTop ();
2221}
2322
24- void TMergePartialStream::CheckSequenceInDebug (const TSortableBatchPosition & nextKeyColumnsPosition) {
23+ void TMergePartialStream::CheckSequenceInDebug (const TRWSortableBatchPosition & nextKeyColumnsPosition) {
2524#ifndef NDEBUG
2625 if (CurrentKeyColumns) {
27- const bool linearExecutionCorrectness = CurrentKeyColumns-> Compare (nextKeyColumnsPosition ) == std::partial_ordering::less ;
26+ const bool linearExecutionCorrectness = nextKeyColumnsPosition. Compare (*CurrentKeyColumns ) == std::partial_ordering::greater ;
2827 if (!linearExecutionCorrectness) {
2928 const bool newSegmentScan = nextKeyColumnsPosition.GetPosition () == 0 ;
3029 AFL_VERIFY (newSegmentScan && nextKeyColumnsPosition.Compare (*CurrentKeyColumns) == std::partial_ordering::less)
3130 (" merge_debug" , DebugJson ())(" current_ext" , nextKeyColumnsPosition.DebugJson ())(" newSegmentScan" , newSegmentScan);
3231 }
3332 }
34- CurrentKeyColumns = nextKeyColumnsPosition;
33+ CurrentKeyColumns = nextKeyColumnsPosition. BuildSortingCursor () ;
3534#else
3635 Y_UNUSED (nextKeyColumnsPosition);
3736#endif
3837}
3938
40- bool TMergePartialStream::DrainToControlPoint (TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition >* lastResultPosition) {
39+ bool TMergePartialStream::DrainToControlPoint (TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor >* lastResultPosition) {
4140 AFL_VERIFY (ControlPoints == 1 );
4241 Y_ABORT_UNLESS ((ui32)DataSchema->num_fields () == builder.GetBuildersCount ());
4342 builder.ValidateDataSchema (DataSchema);
4443 bool cpReachedFlag = false ;
44+ std::shared_ptr<TSortableScanData> resultScanData;
45+ ui64 resultPosition;
4546 while (SortHeap.Size () && !cpReachedFlag && !builder.IsBufferExhausted ()) {
4647 if (SortHeap.Current ().IsControlPoint ()) {
47- auto keyColumns = SortHeap.Current ().GetKeyColumns ();
48+ auto keyColumns = SortHeap.Current ().GetKeyColumns (). BuildSortingCursor () ;
4849 RemoveControlPoint ();
4950 cpReachedFlag = true ;
5051 if (SortHeap.Empty () || !includeFinish || SortHeap.Current ().GetKeyColumns ().Compare (keyColumns) == std::partial_ordering::greater) {
52+ if (lastResultPosition && resultScanData) {
53+ *lastResultPosition = resultScanData->BuildCursor (resultPosition);
54+ }
5155 return true ;
5256 }
5357 }
5458
55- if (auto currentPosition = DrainCurrentPosition ()) {
56- CheckSequenceInDebug (*currentPosition);
57- builder.AddRecord (*currentPosition);
58- if (lastResultPosition) {
59- *lastResultPosition = *currentPosition;
60- }
61- }
59+ DrainCurrentPosition (&builder, &resultScanData, &resultPosition);
60+ }
61+ if (lastResultPosition && resultScanData) {
62+ *lastResultPosition = resultScanData->BuildCursor (resultPosition);
6263 }
6364 return cpReachedFlag;
6465}
6566
66- bool TMergePartialStream::DrainCurrentTo (TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition >* lastResultPosition) {
67- PutControlPoint (std::make_shared<TSortableBatchPosition>( readTo) );
67+ bool TMergePartialStream::DrainCurrentTo (TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor >* lastResultPosition) {
68+ PutControlPoint (readTo);
6869 return DrainToControlPoint (builder, includeFinish, lastResultPosition);
6970}
7071
71- std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain (const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition >* lastResultPosition) {
72+ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain (const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor >* lastResultPosition) {
7273 std::shared_ptr<arrow::Table> result;
7374 if (SortHeap.Empty ()) {
7475 return result;
@@ -100,7 +101,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
100101 result = SortHeap.Current ().GetKeyColumns ().SliceData (pos.GetPosition () + (include ? 0 : 1 ), resultSize);
101102 if (lastResultPosition && resultSize) {
102103 auto keys = SortHeap.Current ().GetKeyColumns ().SliceKeys (pos.GetPosition () + (include ? 0 : 1 ), resultSize);
103- *lastResultPosition = TSortableBatchPosition (keys, 0 , SortSchema->field_names (), {}, true );
104+ *lastResultPosition = TCursor (keys, 0 , SortSchema->field_names ());
104105 }
105106 if (SortHeap.Current ().GetFilter ()) {
106107 SortHeap.Current ().GetFilter ()->Apply (result, pos.GetPosition () + (include ? 0 : 1 ), resultSize);
@@ -109,7 +110,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
109110 result = SortHeap.Current ().GetKeyColumns ().SliceData (startPos, resultSize);
110111 if (lastResultPosition && resultSize) {
111112 auto keys = SortHeap.Current ().GetKeyColumns ().SliceKeys (startPos, resultSize);
112- *lastResultPosition = TSortableBatchPosition (keys, keys->num_rows () - 1 , SortSchema->field_names (), {}, false );
113+ *lastResultPosition = TCursor (keys, keys->num_rows () - 1 , SortSchema->field_names ());
113114 }
114115 if (SortHeap.Current ().GetFilter ()) {
115116 SortHeap.Current ().GetFilter ()->Apply (result, startPos, resultSize);
@@ -144,38 +145,43 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
144145void TMergePartialStream::DrainAll (TRecordBatchBuilder& builder) {
145146 Y_ABORT_UNLESS ((ui32)DataSchema->num_fields () == builder.GetBuildersCount ());
146147 while (SortHeap.Size ()) {
147- if (auto currentPosition = DrainCurrentPosition ()) {
148- CheckSequenceInDebug (*currentPosition);
149- builder.AddRecord (*currentPosition);
150- }
148+ DrainCurrentPosition (&builder, nullptr , nullptr );
151149 }
152150}
153151
154- std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition () {
152+ void TMergePartialStream::DrainCurrentPosition (TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition ) {
155153 Y_ABORT_UNLESS (SortHeap.Size ());
156154 Y_ABORT_UNLESS (!SortHeap.Current ().IsControlPoint ());
157- TSortableBatchPosition result = SortHeap.Current ().GetKeyColumns ();
158- TSortableBatchPosition resultVersion = SortHeap.Current ().GetVersionColumns ();
155+ if (!SortHeap.Current ().IsDeleted ()) {
156+ if (builder) {
157+ builder->AddRecord (SortHeap.Current ().GetKeyColumns ());
158+ }
159+ if (resultScanData && resultPosition) {
160+ *resultScanData = SortHeap.Current ().GetKeyColumns ().GetSorting ();
161+ *resultPosition = SortHeap.Current ().GetKeyColumns ().GetPosition ();
162+ }
163+ }
164+ CheckSequenceInDebug (SortHeap.Current ().GetKeyColumns ());
165+ const ui64 startPosition = SortHeap.Current ().GetKeyColumns ().GetPosition ();
166+ std::shared_ptr<TSortableScanData> startSorting = SortHeap.Current ().GetKeyColumns ().GetSorting ();
167+ std::shared_ptr<TSortableScanData> startVersion = SortHeap.Current ().GetVersionColumns ().GetSorting ();
159168 bool isFirst = true ;
160- const bool deletedFlag = SortHeap.Current ().IsDeleted ();
161- while (SortHeap.Size () && (isFirst || result.Compare (SortHeap.Current ().GetKeyColumns ()) == std::partial_ordering::equivalent)) {
162- auto & anotherIterator = SortHeap.Current ();
169+ while (SortHeap.Size () && (isFirst || SortHeap.Current ().GetKeyColumns ().Compare (*startSorting, startPosition) == std::partial_ordering::equivalent)) {
163170 if (!isFirst) {
171+ auto & anotherIterator = SortHeap.Current ();
164172 if (PossibleSameVersionFlag) {
165- AFL_VERIFY (resultVersion.Compare (anotherIterator.GetVersionColumns ()) != std::partial_ordering::less)(" r" , resultVersion.DebugJson ())(" a" , anotherIterator.GetVersionColumns ().DebugJson ())
166- (" key" , result.DebugJson ());
173+ AFL_VERIFY (anotherIterator.GetVersionColumns ().Compare (*startVersion, startPosition) != std::partial_ordering::greater)
174+ (" r" , startVersion->BuildCursor (startPosition).DebugJson ())(" a" , anotherIterator.GetVersionColumns ().DebugJson ())
175+ (" key" , startSorting->BuildCursor (startPosition).DebugJson ());
167176 } else {
168- AFL_VERIFY (resultVersion.Compare (anotherIterator.GetVersionColumns ()) == std::partial_ordering::greater)(" r" , resultVersion.DebugJson ())(" a" , anotherIterator.GetVersionColumns ().DebugJson ())
169- (" key" , result.DebugJson ());
177+ AFL_VERIFY (anotherIterator.GetVersionColumns ().Compare (*startVersion, startPosition) == std::partial_ordering::less)
178+ (" r" , startVersion->BuildCursor (startPosition).DebugJson ())(" a" , anotherIterator.GetVersionColumns ().DebugJson ())
179+ (" key" , startSorting->BuildCursor (startPosition).DebugJson ());
170180 }
171181 }
172182 SortHeap.Next ();
173183 isFirst = false ;
174184 }
175- if (deletedFlag) {
176- return {};
177- }
178- return result;
179185}
180186
181187std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts (const std::map<TSortableBatchPosition, bool >& positions,
0 commit comments