@@ -131,16 +131,16 @@ struct TKeyEq {
131131TKqpStreamLookupWorker::TKqpStreamLookupWorker (NKikimrKqp::TKqpStreamLookupSettings&& settings,
132132 const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
133133 const NYql::NDqProto::TTaskInput& inputDesc)
134- : TypeEnv(typeEnv)
134+ : Settings(std::move(settings))
135+ , TypeEnv(typeEnv)
135136 , HolderFactory(holderFactory)
136137 , InputDesc(inputDesc)
137- , TablePath(settings.GetTable().GetPath())
138- , TableId(MakeTableId(settings.GetTable()))
139- , Strategy(settings.GetLookupStrategy()) {
138+ , TablePath(Settings.GetTable().GetPath())
139+ , TableId(MakeTableId(Settings.GetTable())) {
140140
141- KeyColumns.reserve (settings .GetKeyColumns ().size ());
141+ KeyColumns.reserve (Settings .GetKeyColumns ().size ());
142142 i32 keyOrder = 0 ;
143- for (const auto & keyColumn : settings .GetKeyColumns ()) {
143+ for (const auto & keyColumn : Settings .GetKeyColumns ()) {
144144 NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto (keyColumn.GetTypeId (), keyColumn.GetTypeInfo ());
145145
146146 KeyColumns.emplace (
@@ -155,15 +155,15 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
155155 );
156156 }
157157
158- LookupKeyColumns.reserve (settings .GetLookupKeyColumns ().size ());
159- for (const auto & lookupKey : settings .GetLookupKeyColumns ()) {
158+ LookupKeyColumns.reserve (Settings .GetLookupKeyColumns ().size ());
159+ for (const auto & lookupKey : Settings .GetLookupKeyColumns ()) {
160160 auto columnIt = KeyColumns.find (lookupKey);
161161 YQL_ENSURE (columnIt != KeyColumns.end ());
162162 LookupKeyColumns.push_back (&columnIt->second );
163163 }
164164
165- Columns.reserve (settings .GetColumns ().size ());
166- for (const auto & column : settings .GetColumns ()) {
165+ Columns.reserve (Settings .GetColumns ().size ());
166+ for (const auto & column : Settings .GetColumns ()) {
167167 NScheme::TTypeInfo typeInfo = NScheme::TypeInfoFromProto (column.GetTypeId (), column.GetTypeInfo ());
168168
169169 Columns.emplace_back (TSysTables::TTableColumnInfo{
@@ -634,7 +634,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
634634 }
635635 }
636636
637- PendingLeftRowsByKey.insert (std::make_pair (std::move (joinKey), TLeftRowInfo{std::move (leftData)}));
637+ PendingLeftRowsByKey.insert (std::make_pair (std::move (joinKey), TLeftRowInfo{std::move (leftData), InputRowSeqNo++ }));
638638 }
639639
640640 std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> requests;
@@ -675,17 +675,62 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
675675 const auto & record = result.ReadResult ->Get ()->Record ;
676676 YQL_ENSURE (record.GetStatus ().GetCode () == Ydb::StatusIds::SUCCESS);
677677
678- auto it = PendingKeysByReadId.find (record.GetReadId ());
679- YQL_ENSURE (it != PendingKeysByReadId.end ());
678+ auto pendingKeysIt = PendingKeysByReadId.find (record.GetReadId ());
679+ YQL_ENSURE (pendingKeysIt != PendingKeysByReadId.end ());
680680
681- ReadResults.emplace_back (std::move (result));
681+ for (; result.UnprocessedResultRow < result.ReadResult ->Get ()->GetRowsCount (); ++result.UnprocessedResultRow ) {
682+ const auto & row = result.ReadResult ->Get ()->GetCells (result.UnprocessedResultRow );
683+ // result can contain fewer columns because of system columns
684+ YQL_ENSURE (row.size () <= ReadColumns.size (), " Result columns mismatch" );
685+
686+ std::vector<TCell> joinKeyCells (LookupKeyColumns.size ());
687+ for (size_t joinKeyColumn = 0 ; joinKeyColumn < LookupKeyColumns.size (); ++joinKeyColumn) {
688+ auto columnIt = ReadColumns.find (LookupKeyColumns[joinKeyColumn]->Name );
689+ YQL_ENSURE (columnIt != ReadColumns.end ());
690+ joinKeyCells[LookupKeyColumns[joinKeyColumn]->KeyOrder ] = row[std::distance (ReadColumns.begin (), columnIt)];
691+ }
692+
693+ auto leftRowIt = PendingLeftRowsByKey.find (joinKeyCells);
694+ YQL_ENSURE (leftRowIt != PendingLeftRowsByKey.end ());
695+
696+ if (Settings.GetLookupStrategy () == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second .RightRowExist ) {
697+ // semi join should return one result row per key
698+ continue ;
699+ }
700+
701+ TReadResultStats rowStats;
702+ auto resultRow = TryBuildResultRow (leftRowIt->second , row, rowStats, result.ShardId );
703+ YQL_ENSURE (IsRowSeqNoValid (leftRowIt->second .SeqNo ));
704+ ResultRowsBySeqNo[leftRowIt->second .SeqNo ].Rows .emplace_back (std::move (resultRow), std::move (rowStats));
705+ }
706+
707+ if (record.GetFinished ()) {
708+ for (const auto & key : pendingKeysIt->second ) {
709+ auto leftRowIt = PendingLeftRowsByKey.find (ExtractKeyPrefix (key));
710+ if (leftRowIt != PendingLeftRowsByKey.end ()) {
711+ leftRowIt->second .PendingReads .erase (record.GetReadId ());
712+
713+ // row is considered processed when all reads are finished
714+ // and at least one right row is found
715+ const bool leftRowProcessed = leftRowIt->second .PendingReads .empty ()
716+ && leftRowIt->second .RightRowExist ;
717+ if (leftRowProcessed) {
718+ YQL_ENSURE (IsRowSeqNoValid (leftRowIt->second .SeqNo ));
719+ ResultRowsBySeqNo[leftRowIt->second .SeqNo ].Completed = true ;
720+ PendingLeftRowsByKey.erase (leftRowIt);
721+ }
722+ }
723+ }
724+
725+ PendingKeysByReadId.erase (pendingKeysIt);
726+ }
682727 }
683728
684729 bool AllRowsProcessed () final {
685730 return UnprocessedRows.empty ()
686731 && UnprocessedKeys.empty ()
687732 && PendingKeysByReadId.empty ()
688- && ReadResults .empty ()
733+ && ResultRowsBySeqNo .empty ()
689734 && PendingLeftRowsByKey.empty ();
690735 }
691736
@@ -724,85 +769,62 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
724769 bool sizeLimitExceeded = false ;
725770 batch.clear ();
726771
727- while (!ReadResults.empty () && !sizeLimitExceeded) {
728- auto & result = ReadResults.front ();
729-
730- for (; result.UnprocessedResultRow < result.ReadResult ->Get ()->GetRowsCount (); ++result.UnprocessedResultRow ) {
731- const auto & row = result.ReadResult ->Get ()->GetCells (result.UnprocessedResultRow );
732- YQL_ENSURE (row.size () <= ReadColumns.size (), " Result columns mismatch" );
733-
734- std::vector<TCell> joinKeyCells (LookupKeyColumns.size ());
735- for (size_t joinKeyIdx = 0 ; joinKeyIdx < LookupKeyColumns.size (); ++joinKeyIdx) {
736- auto it = ReadColumns.find (LookupKeyColumns[joinKeyIdx]->Name );
737- YQL_ENSURE (it != ReadColumns.end ());
738- joinKeyCells[LookupKeyColumns[joinKeyIdx]->KeyOrder ] = row[std::distance (ReadColumns.begin (), it)];
739- }
740-
741- auto leftRowIt = PendingLeftRowsByKey.find (joinKeyCells);
742- YQL_ENSURE (leftRowIt != PendingLeftRowsByKey.end ());
743-
744- if (Strategy == NKqpProto::EStreamLookupStrategy::SEMI_JOIN && leftRowIt->second .RightRowExist ) {
745- // Semi join should return one result row per key
746- continue ;
747- }
772+ // we should process left rows that haven't matches on the right
773+ for (auto leftRowIt = PendingLeftRowsByKey.begin (); leftRowIt != PendingLeftRowsByKey.end ();) {
774+ const bool leftRowShouldBeProcessed = leftRowIt->second .PendingReads .empty ()
775+ && !leftRowIt->second .RightRowExist ;
748776
777+ if (leftRowShouldBeProcessed) {
749778 TReadResultStats rowStats;
750- i64 availableSpace = freeSpace - (i64 )resultStats.ResultBytesCount ;
751- auto resultRow = TryBuildResultRow (leftRowIt->second , row, rowStats, availableSpace, result.ShardId );
779+ auto resultRow = TryBuildResultRow (leftRowIt->second , {}, rowStats);
780+ YQL_ENSURE (IsRowSeqNoValid (leftRowIt->second .SeqNo ));
781+ auto & result = ResultRowsBySeqNo[leftRowIt->second .SeqNo ];
782+ result.Rows .emplace_back (std::move (resultRow), std::move (rowStats));
783+ result.Completed = true ;
784+ PendingLeftRowsByKey.erase (leftRowIt++);
785+ } else {
786+ ++leftRowIt;
787+ }
788+ }
752789
753- if (!resultRow. HasValue () ) {
754- sizeLimitExceeded = true ;
755- break ;
756- }
790+ auto getNextResult = [&]( ) {
791+ if (! ShoulKeepRowsOrder ()) {
792+ return ResultRowsBySeqNo. begin () ;
793+ }
757794
758- batch.push_back (std::move (resultRow));
759- resultStats.Add (rowStats);
795+ return ResultRowsBySeqNo.find (CurrentResultSeqNo);
796+ };
797+
798+ while (!sizeLimitExceeded) {
799+ auto resultIt = getNextResult ();
800+ if (resultIt == ResultRowsBySeqNo.end ()) {
801+ break ;
760802 }
761803
762- if (result.UnprocessedResultRow == result.ReadResult ->Get ()->GetRowsCount ()) {
763- if (result.ReadResult ->Get ()->Record .GetFinished ()) {
764- auto it = PendingKeysByReadId.find (result.ReadResult ->Get ()->Record .GetReadId ());
765- YQL_ENSURE (it != PendingKeysByReadId.end ());
766-
767- for (const auto & range : it->second ) {
768- auto leftRowIt = PendingLeftRowsByKey.find (ExtractKeyPrefix (range));
769- if (leftRowIt != PendingLeftRowsByKey.end ()) {
770- leftRowIt->second .PendingReads .erase (result.ReadResult ->Get ()->Record .GetReadId ());
771-
772- const bool leftRowCanBeDeleted = leftRowIt->second .PendingReads .empty ()
773- && leftRowIt->second .RightRowExist ;
774- if (leftRowCanBeDeleted) {
775- PendingLeftRowsByKey.erase (leftRowIt);
776- }
777- }
778- }
804+ auto & result = resultIt->second ;
805+ for (; result.FirstUnprocessedRow < result.Rows .size (); ++result.FirstUnprocessedRow ) {
806+ auto & row = result.Rows [result.FirstUnprocessedRow ];
779807
780- PendingKeysByReadId.erase (it);
808+ if (resultStats.ResultBytesCount + row.Stats .ResultBytesCount > (ui64)freeSpace) {
809+ sizeLimitExceeded = true ;
810+ break ;
781811 }
782812
783- ReadResults.pop_front ();
813+ batch.emplace_back (std::move (row.Data ));
814+ resultStats.Add (row.Stats );
784815 }
785- }
786816
787- if (!sizeLimitExceeded) {
788- for (auto leftRowIt = PendingLeftRowsByKey.begin (); leftRowIt != PendingLeftRowsByKey.end ();) {
789- const bool leftRowCanBeSent = leftRowIt->second .PendingReads .empty ()
790- && !leftRowIt->second .RightRowExist ;
791-
792- if (leftRowCanBeSent) {
793- TReadResultStats rowStats;
794- i64 availableSpace = freeSpace - (i64 )resultStats.ResultBytesCount ;
795- auto resultRow = TryBuildResultRow (leftRowIt->second , {}, rowStats, availableSpace);
796-
797- if (!resultRow.HasValue ()) {
817+ if (result.FirstUnprocessedRow == result.Rows .size ()) {
818+ if (ShoulKeepRowsOrder ()) {
819+ // we can increment seqNo only if current result is completed
820+ if (result.Completed ) {
821+ ResultRowsBySeqNo.erase (resultIt);
822+ ++CurrentResultSeqNo;
823+ } else {
798824 break ;
799825 }
800-
801- batch.push_back (std::move (resultRow));
802- resultStats.Add (rowStats);
803- PendingLeftRowsByKey.erase (leftRowIt++);
804826 } else {
805- ++leftRowIt ;
827+ ResultRowsBySeqNo. erase (resultIt) ;
806828 }
807829 }
808830 }
@@ -813,17 +835,43 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
813835 ~TKqpJoinRows () {
814836 UnprocessedRows.clear ();
815837 PendingLeftRowsByKey.clear ();
838+ ResultRowsBySeqNo.clear ();
816839 }
817840private:
818841 struct TLeftRowInfo {
819- TLeftRowInfo (NUdf::TUnboxedValue row) : Row(std::move(row)) {
842+ TLeftRowInfo (NUdf::TUnboxedValue row, ui64 seqNo ) : Row(std::move(row)), SeqNo(seqNo ) {
820843 }
821844
822845 NUdf::TUnboxedValue Row;
823846 std::unordered_set<ui64> PendingReads;
824847 bool RightRowExist = false ;
848+ const ui64 SeqNo;
849+ };
850+
851+ struct TResultBatch {
852+ struct TResultRow {
853+ NUdf::TUnboxedValue Data;
854+ TReadResultStats Stats;
855+ };
856+
857+ std::vector<TResultRow> Rows;
858+ ui32 FirstUnprocessedRow = 0 ;
859+ bool Completed = false ;
825860 };
826861
862+ bool ShoulKeepRowsOrder () const {
863+ return Settings.HasKeepRowsOrder () && Settings.GetKeepRowsOrder ();
864+ }
865+
866+ bool IsRowSeqNoValid (const ui64& seqNo) const {
867+ if (!ShoulKeepRowsOrder ()) {
868+ return true ;
869+ }
870+
871+ // we should check row seqNo only if we need to keep the order
872+ return seqNo >= CurrentResultSeqNo;
873+ }
874+
827875 void FillReadRequest (ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
828876 auto & record = request->Record ;
829877
@@ -887,7 +935,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
887935 }
888936
889937 NUdf::TUnboxedValue TryBuildResultRow (TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow,
890- TReadResultStats& rowStats, i64 freeSpace , TMaybe<ui64> shardId = {}) {
938+ TReadResultStats& rowStats, TMaybe<ui64> shardId = {}) {
891939
892940 NUdf::TUnboxedValue* resultRowItems = nullptr ;
893941 auto resultRow = HolderFactory.CreateDirectArrayHolder (2 , resultRowItems);
@@ -938,12 +986,6 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
938986 rowStats.ResultRowsCount += 1 ;
939987 rowStats.ResultBytesCount += leftRowSize + rightRowSize;
940988
941- if (rowStats.ResultBytesCount > (ui64)freeSpace) {
942- resultRow.DeleteUnreferenced ();
943- rowStats.Clear ();
944- return NUdf::TUnboxedValuePod ();
945- }
946-
947989 return resultRow;
948990 }
949991
@@ -953,7 +995,9 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
953995 std::deque<TOwnedTableRange> UnprocessedKeys;
954996 std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId;
955997 absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, TKeyHash, TKeyEq> PendingLeftRowsByKey;
956- std::deque<TShardReadResult> ReadResults;
998+ std::unordered_map<ui64, TResultBatch> ResultRowsBySeqNo;
999+ ui64 InputRowSeqNo = 0 ;
1000+ ui64 CurrentResultSeqNo = 0 ;
9571001};
9581002
9591003std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker (NKikimrKqp::TKqpStreamLookupSettings&& settings,
0 commit comments