@@ -39,6 +39,7 @@ struct TMatchRecognizeProcessorParameters {
3939 TMeasureInputColumnOrder MeasureInputColumnOrder;
4040 TComputationNodePtrVector Measures;
4141 TOutputColumnOrder OutputColumnOrder;
42+ TAfterMatchSkipTo SkipTo;
4243};
4344
4445class TStreamingMatchRecognize {
@@ -49,14 +50,12 @@ class TStreamingMatchRecognize {
4950 NUdf::TUnboxedValue&& partitionKey,
5051 const TMatchRecognizeProcessorParameters& parameters,
5152 TNfaTransitionGraph::TPtr nfaTransitions,
52- const TContainerCacheOnContext& cache,
53- bool afterMatchSkipPastLastRow
53+ const TContainerCacheOnContext& cache
5454 )
5555 : PartitionKey(std::move(partitionKey))
5656 , Parameters(parameters)
5757 , Nfa(nfaTransitions, parameters.MatchedVarsArg, parameters.Defines)
5858 , Cache(cache)
59- , AfterMatchSkipPastLastRow(afterMatchSkipPastLastRow)
6059 {
6160 }
6261
@@ -96,7 +95,7 @@ class TStreamingMatchRecognize {
9695 break ;
9796 }
9897 }
99- if (AfterMatchSkipPastLastRow ) {
98+ if (EAfterMatchSkipTo::PastLastRow == Parameters. SkipTo . To ) {
10099 Nfa.Clear ();
101100 }
102101 return result;
@@ -111,15 +110,13 @@ class TStreamingMatchRecognize {
111110 Rows.Save (serializer);
112111 Nfa.Save (serializer);
113112 serializer.Write (MatchNumber);
114- serializer.Write (AfterMatchSkipPastLastRow);
115113 }
116114
117115 void Load (TMrInputSerializer& serializer) {
118116 // PartitionKey passed in contructor.
119117 Rows.Load (serializer);
120118 Nfa.Load (serializer);
121119 MatchNumber = serializer.Read <ui64>();
122- AfterMatchSkipPastLastRow = serializer.Read <bool >();
123120 }
124121
125122private:
@@ -129,7 +126,6 @@ class TStreamingMatchRecognize {
129126 TNfa Nfa;
130127 const TContainerCacheOnContext& Cache;
131128 ui64 MatchNumber = 0 ;
132- bool AfterMatchSkipPastLastRow;
133129};
134130
135131class TStateForNonInterleavedPartitions
@@ -145,8 +141,7 @@ class TStateForNonInterleavedPartitions
145141 const TContainerCacheOnContext& cache,
146142 TComputationContext &ctx,
147143 TType* rowType,
148- const TMutableObjectOverBoxedValue<TValuePackerBoxed>& rowPacker,
149- bool afterMatchSkipPastLastRow
144+ const TMutableObjectOverBoxedValue<TValuePackerBoxed>& rowPacker
150145 )
151146 : TComputationValue<TStateForNonInterleavedPartitions>(memInfo)
152147 , InputRowArg(inputRowArg)
@@ -158,7 +153,6 @@ class TStateForNonInterleavedPartitions
158153 , Terminating(false )
159154 , SerializerContext(ctx, rowType, rowPacker)
160155 , Ctx(ctx)
161- , AfterMatchSkipPastLastRow(afterMatchSkipPastLastRow)
162156 {}
163157
164158 NUdf::TUnboxedValue Save () const override {
@@ -194,8 +188,7 @@ class TStateForNonInterleavedPartitions
194188 std::move (key),
195189 Parameters,
196190 RowPatternConfiguration,
197- Cache,
198- AfterMatchSkipPastLastRow
191+ Cache
199192 ));
200193 PartitionHandler->Load (in);
201194 }
@@ -261,8 +254,7 @@ class TStateForNonInterleavedPartitions
261254 std::move (partitionKey),
262255 Parameters,
263256 RowPatternConfiguration,
264- Cache,
265- AfterMatchSkipPastLastRow));
257+ Cache));
266258 PartitionHandler->ProcessInputRow (std::move (temp), ctx);
267259 }
268260 if (Terminating) {
@@ -283,7 +275,6 @@ class TStateForNonInterleavedPartitions
283275 bool Terminating;
284276 TSerializerContext SerializerContext;
285277 TComputationContext& Ctx;
286- bool AfterMatchSkipPastLastRow;
287278};
288279
289280class TStateForInterleavedPartitions
@@ -301,8 +292,7 @@ class TStateForInterleavedPartitions
301292 const TContainerCacheOnContext& cache,
302293 TComputationContext &ctx,
303294 TType* rowType,
304- const TMutableObjectOverBoxedValue<TValuePackerBoxed>& rowPacker,
305- bool afterMatchSkipPastLastRow
295+ const TMutableObjectOverBoxedValue<TValuePackerBoxed>& rowPacker
306296 )
307297 : TComputationValue<TStateForInterleavedPartitions>(memInfo)
308298 , InputRowArg(inputRowArg)
@@ -313,7 +303,6 @@ class TStateForInterleavedPartitions
313303 , Cache(cache)
314304 , SerializerContext(ctx, rowType, rowPacker)
315305 , Ctx(ctx)
316- , AfterMatchSkipPastLastRow(afterMatchSkipPastLastRow)
317306 {}
318307
319308 NUdf::TUnboxedValue Save () const override {
@@ -350,8 +339,7 @@ class TStateForInterleavedPartitions
350339 std::move (key),
351340 Parameters,
352341 NfaTransitionGraph,
353- Cache,
354- AfterMatchSkipPastLastRow));
342+ Cache));
355343 pair.first ->second ->Load (in);
356344 }
357345
@@ -418,8 +406,7 @@ class TStateForInterleavedPartitions
418406 std::move (partitionKey),
419407 Parameters,
420408 NfaTransitionGraph,
421- Cache,
422- AfterMatchSkipPastLastRow
409+ Cache
423410 ));
424411 }
425412 }
@@ -438,7 +425,6 @@ class TStateForInterleavedPartitions
438425 const TContainerCacheOnContext& Cache;
439426 TSerializerContext SerializerContext;
440427 TComputationContext& Ctx;
441- bool AfterMatchSkipPastLastRow;
442428};
443429
444430template <class State >
@@ -450,8 +436,7 @@ class TMatchRecognizeWrapper : public TStatefulFlowComputationNode<TMatchRecogni
450436 IComputationNode *partitionKey,
451437 TType* partitionKeyType,
452438 const TMatchRecognizeProcessorParameters& parameters,
453- TType* rowType,
454- bool afterMatchSkipPastLastRow
439+ TType* rowType
455440 )
456441 :TBaseComputation(mutables, inputFlow, kind, EValueRepresentation::Embedded)
457442 , InputFlow(inputFlow)
@@ -462,7 +447,6 @@ class TMatchRecognizeWrapper : public TStatefulFlowComputationNode<TMatchRecogni
462447 , Cache(mutables)
463448 , RowType(rowType)
464449 , RowPacker(mutables)
465- , AfterMatchSkipPastLastRow(afterMatchSkipPastLastRow)
466450 {}
467451
468452 NUdf::TUnboxedValue DoCalculate (NUdf::TUnboxedValue &stateValue, TComputationContext &ctx) const {
@@ -475,8 +459,7 @@ class TMatchRecognizeWrapper : public TStatefulFlowComputationNode<TMatchRecogni
475459 Cache,
476460 ctx,
477461 RowType,
478- RowPacker,
479- AfterMatchSkipPastLastRow
462+ RowPacker
480463 );
481464 } else if (stateValue.HasValue ()) {
482465 MKQL_ENSURE (stateValue.IsBoxed (), " Expected boxed value" );
@@ -491,8 +474,7 @@ class TMatchRecognizeWrapper : public TStatefulFlowComputationNode<TMatchRecogni
491474 Cache,
492475 ctx,
493476 RowType,
494- RowPacker,
495- AfterMatchSkipPastLastRow
477+ RowPacker
496478 );
497479 state.Load2 (stateValue);
498480 stateValue = state;
@@ -541,7 +523,6 @@ class TMatchRecognizeWrapper : public TStatefulFlowComputationNode<TMatchRecogni
541523 const TContainerCacheOnContext Cache;
542524 TType* const RowType;
543525 TMutableObjectOverBoxedValue<TValuePackerBoxed> RowPacker;
544- bool AfterMatchSkipPastLastRow;
545526};
546527
547528TOutputColumnOrder GetOutputColumnOrder (TRuntimeNode partitionKyeColumnsIndexes, TRuntimeNode measureColumnsIndexes) {
@@ -669,7 +650,11 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
669650 defines.push_back (callable.GetInput (inputIndex++));
670651 }
671652 const auto & streamingMode = callable.GetInput (inputIndex++);
672- const auto & afterMatchSkipPastLastRow = callable.GetInput (inputIndex++);
653+ NYql::NMatchRecognize::TAfterMatchSkipTo skipTo = {NYql::NMatchRecognize::EAfterMatchSkipTo::NextRow, " " };
654+ if (callable.GetInputsCount () - inputIndex >= 2 ) {
655+ skipTo.To = static_cast <EAfterMatchSkipTo>(AS_VALUE (TDataLiteral, callable.GetInput (inputIndex++))->AsValue ().Get <i32 >());
656+ skipTo.Var = AS_VALUE (TDataLiteral, callable.GetInput (inputIndex++))->AsValue ().AsStringRef ();
657+ }
673658 MKQL_ENSURE (callable.GetInputsCount () == inputIndex, " Wrong input count" );
674659
675660 const auto & [vars, varsLookup] = ConvertListOfStrings (varNames);
@@ -690,6 +675,7 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
690675 )
691676 , ConvertVectorOfCallables (measures, ctx)
692677 , GetOutputColumnOrder (partitionColumnIndexes, measureColumnIndexes)
678+ , skipTo
693679 };
694680 if (AS_VALUE (TDataLiteral, streamingMode)->AsValue ().Get <bool >()) {
695681 return new TMatchRecognizeWrapper<TStateForInterleavedPartitions>(ctx.Mutables
@@ -700,7 +686,6 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
700686 , partitionKeySelector.GetStaticType ()
701687 , std::move (parameters)
702688 , rowType
703- , AS_VALUE (TDataLiteral, afterMatchSkipPastLastRow)->AsValue ().Get <bool >()
704689 );
705690 } else {
706691 return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions>(ctx.Mutables
@@ -711,7 +696,6 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
711696 , partitionKeySelector.GetStaticType ()
712697 , std::move (parameters)
713698 , rowType
714- , AS_VALUE (TDataLiteral, afterMatchSkipPastLastRow)->AsValue ().Get <bool >()
715699 );
716700 }
717701}
0 commit comments