1010#include < ydb/library/arrow_kernels/operations.h>
1111#include < ydb/library/formats/arrow/switch/switch_type.h>
1212
13+ #include < library/cpp/string_utils/quote/quote.h>
1314#include < util/string/builder.h>
15+ #include < util/string/escape.h>
1416#include < yql/essentials/core/arrow_kernels/request/request.h>
1517
1618namespace NKikimr ::NArrow::NSSA::NGraph::NOptimization {
@@ -199,9 +201,10 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
199201 if (!i.second ->Is (EProcessorType::FetchOriginalData)) {
200202 continue ;
201203 }
202- if (i.second ->GetProcessorAs <TOriginalColumnDataProcessor>()->GetDataAddresses ().size () +
203- i.second ->GetProcessorAs <TOriginalColumnDataProcessor>()->GetIndexContext ().size () +
204- i.second ->GetProcessorAs <TOriginalColumnDataProcessor>()->GetHeaderContext ().size () > 1 ) {
204+ if (i.second ->GetProcessorAs <TOriginalColumnDataProcessor>()->GetDataAddresses ().size () +
205+ i.second ->GetProcessorAs <TOriginalColumnDataProcessor>()->GetIndexContext ().size () +
206+ i.second ->GetProcessorAs <TOriginalColumnDataProcessor>()->GetHeaderContext ().size () >
207+ 1 ) {
205208 continue ;
206209 }
207210 if (i.second ->GetProcessorAs <TOriginalColumnDataProcessor>()->GetDataAddresses ().size ()) {
@@ -220,8 +223,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
220223 for (auto && i : dataAddresses) {
221224 columnIds.emplace (i->GetProcessorAs <TOriginalColumnDataProcessor>()->GetOutputColumnIdOnce ());
222225 }
223- auto proc =
224- std::make_shared<TOriginalColumnDataProcessor>(std::vector<ui32>(columnIds.begin (), columnIds.end ()));
226+ auto proc = std::make_shared<TOriginalColumnDataProcessor>(std::vector<ui32>(columnIds.begin (), columnIds.end ()));
225227 for (auto && i : dataAddresses) {
226228 for (auto && addr : i->GetProcessorAs <TOriginalColumnDataProcessor>()->GetDataAddresses ()) {
227229 proc->Add (addr.second );
@@ -230,7 +232,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
230232 auto nodeFetch = AddNode (proc);
231233 FetchersMerged.emplace (nodeFetch->GetIdentifier ());
232234 for (auto && i : dataAddresses) {
233- for (auto && to: i->GetOutputEdges ()) {
235+ for (auto && to : i->GetOutputEdges ()) {
234236 AddEdge (nodeFetch.get (), to.second , to.first .GetResourceId ());
235237 }
236238 RemoveNode (i->GetIdentifier ());
@@ -245,8 +247,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
245247 for (auto && i : headers) {
246248 columnIds.emplace (i->GetProcessorAs <TOriginalColumnDataProcessor>()->GetOutputColumnIdOnce ());
247249 }
248- auto proc =
249- std::make_shared<TOriginalColumnDataProcessor>(std::vector<ui32>(columnIds.begin (), columnIds.end ()));
250+ auto proc = std::make_shared<TOriginalColumnDataProcessor>(std::vector<ui32>(columnIds.begin (), columnIds.end ()));
250251 for (auto && i : indexes) {
251252 for (auto && addr : i->GetProcessorAs <TOriginalColumnDataProcessor>()->GetIndexContext ()) {
252253 proc->Add (addr.second );
@@ -361,11 +362,11 @@ TConclusion<bool> TGraph::OptimizeConditionsForIndexes(TGraphNode* condNode) {
361362 if (condNode->GetProcessor ()->GetProcessorType () != EProcessorType::Calculation) {
362363 return false ;
363364 }
364- if (condNode->GetProcessor ()->GetInput ().size () != 2 ) {
365+ auto calc = condNode->GetProcessorAs <TCalculationProcessor>();
366+ if (!calc->GetKernelLogic ()) {
365367 return false ;
366368 }
367- auto calc = condNode->GetProcessorAs <TCalculationProcessor>();
368- if (!calc->GetYqlOperationId ()) {
369+ if (condNode->GetProcessor ()->GetInput ().size () != 2 ) {
369370 return false ;
370371 }
371372 if (condNode->GetOutputEdges ().size () != 1 ) {
@@ -376,17 +377,7 @@ TConclusion<bool> TGraph::OptimizeConditionsForIndexes(TGraphNode* condNode) {
376377 if (constNode->GetProcessor ()->GetProcessorType () != EProcessorType::Const) {
377378 return false ;
378379 }
379- if (!!calc->GetKernelLogic ()) {
380- if (!calc->GetKernelLogic ()->IsBoolInResult ()) {
381- return false ;
382- }
383- }
384- if (calc->GetYqlOperationId ()) {
385- if (!IsBoolResultYqlOperator ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId ())) {
386- return false ;
387- }
388- }
389- if (!calc->GetYqlOperationId () && !calc->GetKernelLogic ()) {
380+ if (!calc->GetKernelLogic ()->IsBoolInResult ()) {
390381 return false ;
391382 }
392383 std::optional<TResourceAddress> dataAddr = GetOriginalAddress (dataNode);
@@ -395,63 +386,44 @@ TConclusion<bool> TGraph::OptimizeConditionsForIndexes(TGraphNode* condNode) {
395386 }
396387 auto * dest = condNode->GetOutputEdges ().begin ()->second ;
397388 const ui32 destResourceId = condNode->GetOutputEdges ().begin ()->first .GetResourceId ();
398- if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId () == NYql::TKernelRequestBuilder::EBinaryOp::Equals ||
399- (NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId () == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith ||
400- (NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId () == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith ||
401- (NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId () == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) {
402- if (!IndexesConstructed.emplace (condNode->GetIdentifier ()).second ) {
403- return false ;
404- }
405- RemoveEdge (condNode, dest, destResourceId);
406-
407- const EIndexCheckOperation indexOperation = [&]() {
408- if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId () == NYql::TKernelRequestBuilder::EBinaryOp::Equals) {
409- return EIndexCheckOperation::Equals;
410- }
411- if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId () == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith) {
412- return EIndexCheckOperation::StartsWith;
413- }
414- if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId () == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) {
415- return EIndexCheckOperation::EndsWith;
416- }
417- if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId () == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) {
418- return EIndexCheckOperation::Contains;
419- }
420- return EIndexCheckOperation::Contains;
421- AFL_VERIFY (false );
422- }();
423-
424- const ui32 resourceIdxFetch = BuildNextResourceId ();
425- IDataSource::TFetchIndexContext indexContext (dataAddr->GetColumnId (),
426- IDataSource::TFetchIndexContext::TOperationsBySubColumn ().Add (dataAddr->GetSubColumnName (), indexOperation));
427- auto indexFetchProc = std::make_shared<TOriginalColumnDataProcessor>(resourceIdxFetch, indexContext);
428- auto indexFetchNode = AddNode (indexFetchProc);
429- RegisterProducer (resourceIdxFetch, indexFetchNode.get ());
430-
431- const ui32 resourceIdIndexToAnd = BuildNextResourceId ();
432- IDataSource::TCheckIndexContext checkIndexContext (dataAddr->GetColumnId (), dataAddr->GetSubColumnName (), indexOperation);
433- auto indexCheckProc = std::make_shared<TIndexCheckerProcessor>(
434- resourceIdxFetch, constNode->GetProcessor ()->GetOutputColumnIdOnce (), checkIndexContext, resourceIdIndexToAnd);
435- auto indexProcNode = AddNode (indexCheckProc);
436- RegisterProducer (resourceIdIndexToAnd, indexProcNode.get ());
437- AddEdge (indexFetchNode.get (), indexProcNode.get (), resourceIdxFetch);
438- AddEdge (constNode, indexProcNode.get (), constNode->GetProcessor ()->GetOutputColumnIdOnce ());
439-
440- const ui32 resourceIdEqToAnd = BuildNextResourceId ();
441- RegisterProducer (resourceIdEqToAnd, condNode);
442- calc->SetOutputResourceIdOnce (resourceIdEqToAnd);
443-
444- auto andProcessor = std::make_shared<TStreamLogicProcessor>(TColumnChainInfo::BuildVector ({ resourceIdEqToAnd, resourceIdIndexToAnd }),
445- TColumnChainInfo (destResourceId), NKernels::EOperation::And);
446- auto andNode = AddNode (andProcessor);
447- AddEdge (andNode.get (), dest, destResourceId);
448-
449- AddEdge (indexProcNode.get (), andNode.get (), resourceIdIndexToAnd);
450- AddEdge (condNode, andNode.get (), resourceIdEqToAnd);
451- ResetProducer (destResourceId, andNode.get ());
452- return true ;
389+ auto indexChecker = calc->GetKernelLogic ()->GetIndexCheckerOperation ();
390+ if (!indexChecker) {
391+ return false ;
453392 }
454- return false ;
393+ if (!IndexesConstructed.emplace (condNode->GetIdentifier ()).second ) {
394+ return false ;
395+ }
396+ RemoveEdge (condNode, dest, destResourceId);
397+
398+ const ui32 resourceIdxFetch = BuildNextResourceId ();
399+ IDataSource::TFetchIndexContext indexContext (
400+ dataAddr->GetColumnId (), IDataSource::TFetchIndexContext::TOperationsBySubColumn ().Add (dataAddr->GetSubColumnName (), *indexChecker));
401+ auto indexFetchProc = std::make_shared<TOriginalColumnDataProcessor>(resourceIdxFetch, indexContext);
402+ auto indexFetchNode = AddNode (indexFetchProc);
403+ RegisterProducer (resourceIdxFetch, indexFetchNode.get ());
404+
405+ const ui32 resourceIdIndexToAnd = BuildNextResourceId ();
406+ IDataSource::TCheckIndexContext checkIndexContext (dataAddr->GetColumnId (), dataAddr->GetSubColumnName (), *indexChecker);
407+ auto indexCheckProc = std::make_shared<TIndexCheckerProcessor>(
408+ resourceIdxFetch, constNode->GetProcessor ()->GetOutputColumnIdOnce (), checkIndexContext, resourceIdIndexToAnd);
409+ auto indexProcNode = AddNode (indexCheckProc);
410+ RegisterProducer (resourceIdIndexToAnd, indexProcNode.get ());
411+ AddEdge (indexFetchNode.get (), indexProcNode.get (), resourceIdxFetch);
412+ AddEdge (constNode, indexProcNode.get (), constNode->GetProcessor ()->GetOutputColumnIdOnce ());
413+
414+ const ui32 resourceIdEqToAnd = BuildNextResourceId ();
415+ RegisterProducer (resourceIdEqToAnd, condNode);
416+ calc->SetOutputResourceIdOnce (resourceIdEqToAnd);
417+
418+ auto andProcessor = std::make_shared<TStreamLogicProcessor>(
419+ TColumnChainInfo::BuildVector ({ resourceIdEqToAnd, resourceIdIndexToAnd }), TColumnChainInfo (destResourceId), NKernels::EOperation::And);
420+ auto andNode = AddNode (andProcessor);
421+ AddEdge (andNode.get (), dest, destResourceId);
422+
423+ AddEdge (indexProcNode.get (), andNode.get (), resourceIdIndexToAnd);
424+ AddEdge (condNode, andNode.get (), resourceIdEqToAnd);
425+ ResetProducer (destResourceId, andNode.get ());
426+ return true ;
455427}
456428
457429bool TGraph::IsBoolResultYqlOperator (const NYql::TKernelRequestBuilder::EBinaryOp op) const {
@@ -687,16 +659,16 @@ TConclusionStatus TGraph::Collapse() {
687659 }
688660 }
689661
690- // {
691- // auto conclusion = OptimizeConditionsForHeadersCheck(n.get());
692- // if (conclusion.IsFail()) {
693- // return conclusion;
694- // }
695- // if (*conclusion) {
696- // hasChanges = true;
697- // break;
698- // }
699- // }
662+ // {
663+ // auto conclusion = OptimizeConditionsForHeadersCheck(n.get());
664+ // if (conclusion.IsFail()) {
665+ // return conclusion;
666+ // }
667+ // if (*conclusion) {
668+ // hasChanges = true;
669+ // break;
670+ // }
671+ // }
700672
701673 {
702674 auto conclusion = OptimizeConditionsForStream (n.get ());
0 commit comments