@@ -34,112 +34,14 @@ struct TMatchRecognizeProcessorParameters {
3434 TMeasureInputColumnOrder MeasureInputColumnOrder;
3535 TComputationNodePtrVector Measures;
3636 TOutputColumnOrder OutputColumnOrder;
37- };
38-
39- class TBackTrackingMatchRecognize {
40- using TPartitionList = TSimpleList;
41- using TRange = TPartitionList::TRange;
42- using TMatchedVars = TMatchedVars<TRange>;
43- public:
44- // TODO(YQL-16486): create a tree for backtracking(replace var names with indexes)
45-
46- struct TPatternConfiguration {};
47-
48- struct TPatternConfigurationBuilder {
49- using TPatternConfigurationPtr = std::shared_ptr<TPatternConfiguration>;
50- static TPatternConfigurationPtr Create (const TRowPattern& pattern, const THashMap<TString, size_t >& varNameToIndex) {
51- Y_UNUSED (pattern);
52- Y_UNUSED (varNameToIndex);
53- return std::make_shared<TPatternConfiguration>();
54- }
55- };
56-
57- TBackTrackingMatchRecognize (
58- NUdf::TUnboxedValue&& partitionKey,
59- const TMatchRecognizeProcessorParameters& parameters,
60- const TPatternConfigurationBuilder::TPatternConfigurationPtr pattern,
61- const TContainerCacheOnContext& cache
62- )
63- : PartitionKey(std::move(partitionKey))
64- , Parameters(parameters)
65- , Cache(cache)
66- , CurMatchedVars(parameters.Defines.size())
67- , MatchNumber(0 )
68- {
69- // TODO(YQL-16486)
70- Y_UNUSED (pattern);
71- }
72-
73- bool ProcessInputRow (NUdf::TUnboxedValue&& row, TComputationContext& ctx) {
74- Y_UNUSED (ctx);
75- Rows.Append (std::move (row));
76- return false ;
77- }
78- NUdf::TUnboxedValue GetOutputIfReady (TComputationContext& ctx) {
79- if (Matches.empty ())
80- return NUdf::TUnboxedValue{};
81- Parameters.MatchedVarsArg ->SetValue (ctx, ToValue (ctx.HolderFactory , std::move (Matches.front ())));
82- Matches.pop_front ();
83- Parameters.MeasureInputDataArg ->SetValue (ctx, ctx.HolderFactory .Create <TMeasureInputDataValue>(
84- Parameters.InputDataArg ->GetValue (ctx),
85- Parameters.MeasureInputColumnOrder ,
86- Parameters.MatchedVarsArg ->GetValue (ctx),
87- Parameters.VarNames ,
88- ++MatchNumber
89- ));
90- NUdf::TUnboxedValue *itemsPtr = nullptr ;
91- const auto result = Cache.NewArray (ctx, Parameters.OutputColumnOrder .size (), itemsPtr);
92- for (auto const & c: Parameters.OutputColumnOrder ) {
93- switch (c.first ) {
94- case EOutputColumnSource::Measure:
95- *itemsPtr++ = Parameters.Measures [c.second ]->GetValue (ctx);
96- break ;
97- case EOutputColumnSource::PartitionKey:
98- *itemsPtr++ = PartitionKey.GetElement (c.second );
99- break ;
100- }
101- }
102- return result;
103- }
104- bool ProcessEndOfData (TComputationContext& ctx) {
105- // Assume, that data moved to IComputationExternalNode node, will not be modified or released
106- // till the end of the current function
107- auto rowsSize = Rows.Size ();
108- Parameters.InputDataArg ->SetValue (ctx, ctx.HolderFactory .Create <TListValue<TPartitionList>>(Rows));
109- for (size_t i = 0 ; i != rowsSize; ++i) {
110- Parameters.CurrentRowIndexArg ->SetValue (ctx, NUdf::TUnboxedValuePod (static_cast <ui64>(i)));
111- for (size_t v = 0 ; v != Parameters.Defines .size (); ++v) {
112- const auto &d = Parameters.Defines [v]->GetValue (ctx);
113- if (d && d.GetOptionalValue ().Get <bool >()) {
114- Extend (CurMatchedVars[v], TRange{i});
115- }
116- }
117- // for the sake of dummy usage assume non-overlapped matches at every 5th row of any partition
118- if (i % 5 == 0 ) {
119- TMatchedVars temp;
120- temp.swap (CurMatchedVars);
121- Matches.emplace_back (std::move (temp));
122- CurMatchedVars.resize (Parameters.Defines .size ());
123- }
124- }
125- return not Matches.empty ();
126- }
127- private:
128- const NUdf::TUnboxedValue PartitionKey;
129- const TMatchRecognizeProcessorParameters& Parameters;
130- const TContainerCacheOnContext& Cache;
131- TSimpleList Rows;
132- TMatchedVars CurMatchedVars;
133- std::deque<TMatchedVars, TMKQLAllocator<TMatchedVars>> Matches;
134- ui64 MatchNumber;
37+ TAfterMatchSkipTo SkipTo;
13538};
13639
13740class TStreamingMatchRecognize {
13841 using TPartitionList = TSparseList;
13942 using TRange = TPartitionList::TRange;
14043 using TMatchedVars = TMatchedVars<TRange>;
14144public:
142- using TPatternConfigurationBuilder = TNfaTransitionGraphBuilder;
14345 TStreamingMatchRecognize (
14446 NUdf::TUnboxedValue&& partitionKey,
14547 const TMatchRecognizeProcessorParameters& parameters,
@@ -183,6 +85,9 @@ class TStreamingMatchRecognize {
18385 break ;
18486 }
18587 }
88+ if (EAfterMatchSkipTo::PastLastRow == Parameters.SkipTo .To ) {
89+ Nfa.Clear ();
90+ }
18691 return result;
18792 }
18893 bool ProcessEndOfData (TComputationContext& ctx) {
@@ -198,11 +103,9 @@ class TStreamingMatchRecognize {
198103 ui64 MatchNumber = 0 ;
199104};
200105
201- template <typename Algo>
202106class TStateForNonInterleavedPartitions
203- : public TComputationValue<TStateForNonInterleavedPartitions<Algo> >
107+ : public TComputationValue<TStateForNonInterleavedPartitions>
204108{
205- using TRowPatternConfigurationBuilder = typename Algo::TPatternConfigurationBuilder;
206109public:
207110 TStateForNonInterleavedPartitions (
208111 TMemoryUsageInfo* memInfo,
@@ -217,10 +120,15 @@ class TStateForNonInterleavedPartitions
217120 , PartitionKey(partitionKey)
218121 , PartitionKeyPacker(true , partitionKeyType)
219122 , Parameters(parameters)
220- , RowPatternConfiguration(TRowPatternConfigurationBuilder ::Create(parameters.Pattern, parameters.VarNamesLookup))
123+ , RowPatternConfiguration(TNfaTransitionGraphBuilder ::Create(parameters.Pattern, parameters.VarNamesLookup))
221124 , Cache(cache)
222125 , Terminating(false )
223126 {}
127+
128+ bool HasListItems () const override {
129+ return false ;
130+ }
131+
224132 bool ProcessInputRow (NUdf::TUnboxedValue&& row, TComputationContext& ctx) {
225133 MKQL_ENSURE (not DelayedRow, " Internal logic error" ); // we're finalizing previous partition
226134 InputRowArg->SetValue (ctx, NUdf::TUnboxedValue (row));
@@ -264,12 +172,11 @@ class TStateForNonInterleavedPartitions
264172 InputRowArg->SetValue (ctx, NUdf::TUnboxedValue (temp));
265173 auto partitionKey = PartitionKey->GetValue (ctx);
266174 CurPartitionPackedKey = PartitionKeyPacker.Pack (partitionKey);
267- PartitionHandler.reset (new Algo (
175+ PartitionHandler.reset (new TStreamingMatchRecognize (
268176 std::move (partitionKey),
269177 Parameters,
270178 RowPatternConfiguration,
271- Cache
272- ));
179+ Cache));
273180 PartitionHandler->ProcessInputRow (std::move (temp), ctx);
274181 }
275182 if (Terminating) {
@@ -279,12 +186,12 @@ class TStateForNonInterleavedPartitions
279186 }
280187private:
281188 TString CurPartitionPackedKey;
282- std::unique_ptr<Algo > PartitionHandler;
189+ std::unique_ptr<TStreamingMatchRecognize > PartitionHandler;
283190 IComputationExternalNode* InputRowArg;
284191 IComputationNode* PartitionKey;
285192 TValuePackerGeneric<false > PartitionKeyPacker;
286193 const TMatchRecognizeProcessorParameters& Parameters;
287- const typename TRowPatternConfigurationBuilder::TPatternConfigurationPtr RowPatternConfiguration;
194+ const TNfaTransitionGraph::TPtr RowPatternConfiguration;
288195 const TContainerCacheOnContext& Cache;
289196 NUdf::TUnboxedValue DelayedRow;
290197 bool Terminating;
@@ -576,6 +483,11 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
576483 defines.push_back (callable.GetInput (inputIndex++));
577484 }
578485 const auto & streamingMode = callable.GetInput (inputIndex++);
486+ NYql::NMatchRecognize::TAfterMatchSkipTo skipTo = {NYql::NMatchRecognize::EAfterMatchSkipTo::NextRow, " " };
487+ if (inputIndex + 2 <= callable.GetInputsCount ()) {
488+ skipTo.To = static_cast <EAfterMatchSkipTo>(AS_VALUE (TDataLiteral, callable.GetInput (inputIndex++))->AsValue ().Get <i32 >());
489+ skipTo.Var = AS_VALUE (TDataLiteral, callable.GetInput (inputIndex++))->AsValue ().AsStringRef ();
490+ }
579491 MKQL_ENSURE (callable.GetInputsCount () == inputIndex, " Wrong input count" );
580492
581493 const auto & [vars, varsLookup] = ConvertListOfStrings (varNames);
@@ -595,6 +507,7 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
595507 )
596508 , ConvertVectorOfCallables (measures, ctx)
597509 , GetOutputColumnOrder (partitionColumnIndexes, measureColumnIndexes)
510+ , skipTo
598511 };
599512 if (AS_VALUE (TDataLiteral, streamingMode)->AsValue ().Get <bool >()) {
600513 return new TMatchRecognizeWrapper<TStateForInterleavedPartitions>(ctx.Mutables
@@ -606,26 +519,14 @@ IComputationNode* WrapMatchRecognizeCore(TCallable& callable, const TComputation
606519 , std::move (parameters)
607520 );
608521 } else {
609- const bool useNfaForTables = true ; // TODO(YQL-16486) get this flag from an optimizer
610- if (useNfaForTables) {
611- return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions<TStreamingMatchRecognize>>(ctx.Mutables
612- , GetValueRepresentation (inputFlow.GetStaticType ())
613- , LocateNode (ctx.NodeLocator , *inputFlow.GetNode ())
614- , static_cast <IComputationExternalNode*>(LocateNode (ctx.NodeLocator , *inputRowArg.GetNode ()))
615- , LocateNode (ctx.NodeLocator , *partitionKeySelector.GetNode ())
616- , partitionKeySelector.GetStaticType ()
617- , std::move (parameters)
618- );
619- } else {
620- return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions<TBackTrackingMatchRecognize>>(ctx.Mutables
621- , GetValueRepresentation (inputFlow.GetStaticType ())
622- , LocateNode (ctx.NodeLocator , *inputFlow.GetNode ())
623- , static_cast <IComputationExternalNode*>(LocateNode (ctx.NodeLocator , *inputRowArg.GetNode ()))
624- , LocateNode (ctx.NodeLocator , *partitionKeySelector.GetNode ())
625- , partitionKeySelector.GetStaticType ()
626- , std::move (parameters)
627- );
628- }
522+ return new TMatchRecognizeWrapper<TStateForNonInterleavedPartitions>(ctx.Mutables
523+ , GetValueRepresentation (inputFlow.GetStaticType ())
524+ , LocateNode (ctx.NodeLocator , *inputFlow.GetNode ())
525+ , static_cast <IComputationExternalNode*>(LocateNode (ctx.NodeLocator , *inputRowArg.GetNode ()))
526+ , LocateNode (ctx.NodeLocator , *partitionKeySelector.GetNode ())
527+ , partitionKeySelector.GetStaticType ()
528+ , std::move (parameters)
529+ );
629530 }
630531}
631532
0 commit comments