@@ -25,7 +25,10 @@ class TDqExecutionValidator {
2525 ctx.AddError (YqlIssue (ctx.GetPosition (where.Pos ()), TIssuesIds::DQ_OPTIMIZE_ERROR, err));
2626 }
2727
28- bool ValidateDqStage (const TExprNode& node) {
28+ bool ValidateDqStage (const TExprNode& node, TNodeSet* visitedStages) {
29+ if (visitedStages) {
30+ visitedStages->insert (&node);
31+ }
2932 if (!Visited_.insert (&node).second ) {
3033 return true ;
3134 }
@@ -36,11 +39,16 @@ class TDqExecutionValidator {
3639 ReportError (Ctx_, *bad, TStringBuilder () << " Cannot execute " << bad->Content () << " over stream/flow inside DQ stage" );
3740 }
3841
42+
43+ bool hasMapJoin = false ;
3944 VisitExpr (TDqStageBase (&node).Program ().Body ().Ptr (),
4045 [](const TExprNode::TPtr& n) {
4146 return !TDqConnection::Match (n.Get ()) && !TDqPhyPrecompute::Match (n.Get ()) && !TDqReadWrapBase::Match (n.Get ());
4247 },
43- [&readPerProvider_ = ReadsPerProvider_, &hasErrors, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
48+ [&readPerProvider_ = ReadsPerProvider_, &hasErrors, &hasMapJoin, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
49+ if (TDqPhyMapJoin::Match (n.Get ())) {
50+ hasMapJoin = true ;
51+ }
4452 if (TCoScriptUdf::Match (n.Get ()) && NKikimr::NMiniKQL::IsSystemPython (NKikimr::NMiniKQL::ScriptTypeFromStr (n->Head ().Content ()))) {
4553 ReportError (ctx, *n, TStringBuilder () << " Cannot execute system python udf " << n->Content () << " in DQ" );
4654 hasErrors = true ;
@@ -60,28 +68,48 @@ class TDqExecutionValidator {
6068 }
6169 );
6270
63- for (auto n: TDqStageBase (&node).Inputs ()) {
64- hasErrors |= !ValidateDqNode (n.Ref ());
71+ HasMapJoin_ |= hasMapJoin;
72+ if (hasMapJoin && CheckSelfMapJoin_) {
73+ TNodeSet unitedVisitedStages;
74+ bool nonUniqStages = false ;
75+ for (auto n: TDqStageBase (&node).Inputs ()) {
76+ TNodeSet inputVisitedStages;
77+ hasErrors |= !ValidateDqNode (n.Ref (), &inputVisitedStages);
78+ const size_t expectedSize = unitedVisitedStages.size () + inputVisitedStages.size ();
79+ unitedVisitedStages.insert (inputVisitedStages.begin (), inputVisitedStages.end ());
80+ nonUniqStages |= (expectedSize != unitedVisitedStages.size ()); // Found duplicates - some stage was visited twice from different inputs
81+ }
82+ if (nonUniqStages) {
83+ ReportError (Ctx_, node, TStringBuilder () << " Cannot execute self join using mapjoin strategy in DQ" );
84+ hasErrors = true ;
85+ }
86+ if (visitedStages) {
87+ visitedStages->insert (unitedVisitedStages.begin (), unitedVisitedStages.end ());
88+ }
89+ } else {
90+ for (auto n: TDqStageBase (&node).Inputs ()) {
91+ hasErrors |= !ValidateDqNode (n.Ref (), visitedStages);
92+ }
6593 }
6694
6795 if (auto outs = TDqStageBase (&node).Outputs ()) {
6896 for (auto n: outs.Cast ()) {
69- hasErrors |= !ValidateDqNode (n.Ref ());
97+ hasErrors |= !ValidateDqNode (n.Ref (), nullptr );
7098 }
7199 }
72100
73101 return !hasErrors;
74102
75103 }
76104
77- bool ValidateDqNode (const TExprNode& node) {
105+ bool ValidateDqNode (const TExprNode& node, TNodeSet* visitedStages ) {
78106 if (node.GetState () == TExprNode::EState::ExecutionComplete) {
79107 return true ;
80108 }
81109
82110 if (TDqStageBase::Match (&node)) {
83111 // visited will be updated inside ValidateDqStage
84- return ValidateDqStage (node);
112+ return ValidateDqStage (node, visitedStages );
85113 }
86114
87115 if (!Visited_.insert (&node).second ) {
@@ -94,10 +122,10 @@ class TDqExecutionValidator {
94122 }
95123
96124 if (TDqConnection::Match (&node)) {
97- return ValidateDqStage (TDqConnection (&node).Output ().Stage ().Ref ());
125+ return ValidateDqStage (TDqConnection (&node).Output ().Stage ().Ref (), TDqCnValue::Match (&node) ? nullptr : visitedStages );
98126 }
99127 if (TDqPhyPrecompute::Match (&node)) {
100- return ValidateDqNode (TDqPhyPrecompute (&node).Connection ().Ref ());
128+ return ValidateDqNode (TDqPhyPrecompute (&node).Connection ().Ref (), nullptr );
101129 }
102130
103131 if (TDqSource::Match (&node) || TDqTransform::Match (&node) || TDqSink::Match (&node)) {
@@ -113,14 +141,16 @@ class TDqExecutionValidator {
113141 : TypeCtx_(typeCtx)
114142 , Ctx_(ctx)
115143 , State_(state)
144+ , CheckSelfMapJoin_(!TypeCtx_.ForceDq
145+ && !State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate)
146+ && !State_->Settings->IsSpillingEnabled ())
116147 {}
117148
118149 bool ValidateDqExecution (const TExprNode& node) {
119150 YQL_LOG_CTX_SCOPE (__FUNCTION__);
120151
121152 TNodeSet dqNodes;
122153
123- bool hasJoin = false ;
124154 if (TDqCnResult::Match (&node)) {
125155 dqNodes.insert (TDqCnResult (&node).Output ().Stage ().Raw ());
126156 } else if (TDqQuery::Match (&node)) {
@@ -142,47 +172,44 @@ class TDqExecutionValidator {
142172 });
143173 }
144174
145- VisitExpr (node, [&hasJoin](const TExprNode& n) {
146- if (TMaybeNode<TDqPhyMapJoin>(&n)) {
147- hasJoin = true ;
148- }
149- return true ;
150- });
151-
152175 bool hasError = false ;
153176
154177 for (const auto n: dqNodes) {
155- hasError |= !ValidateDqNode (*n);
178+ hasError |= !ValidateDqNode (*n, nullptr );
156179 if (hasError) {
157180 break ;
158181 }
159182 }
160183
161- for (auto & [integration, nodes]: ReadsPerProvider_) {
162- TMaybe<ui64> size;
163- hasError |= !(size = integration->EstimateReadSize (State_->Settings ->DataSizePerJob .Get ().GetOrElse (TDqSettings::TDefault::DataSizePerJob),
164- State_->Settings ->MaxTasksPerStage .Get ().GetOrElse (TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_));
165- if (hasError) {
166- break ;
184+ if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq ) {
185+ size_t dataSize = 0 ;
186+ for (auto & [integration, nodes]: ReadsPerProvider_) {
187+ TMaybe<ui64> size;
188+ hasError |= !(size = integration->EstimateReadSize (State_->Settings ->DataSizePerJob .Get ().GetOrElse (TDqSettings::TDefault::DataSizePerJob),
189+ State_->Settings ->MaxTasksPerStage .Get ().GetOrElse (TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_));
190+ if (hasError) {
191+ break ;
192+ }
193+ dataSize += *size;
167194 }
168- DataSize_ += *size;
169- }
170195
171- if (!hasError && hasJoin && DataSize_ > State_->Settings ->MaxDataSizePerQuery .Get ().GetOrElse (10_GB)) {
172- ReportError (Ctx_, node, TStringBuilder () << " too big join input: " << DataSize_);
173- return false ;
196+ if (dataSize > State_->Settings ->MaxDataSizePerQuery .Get ().GetOrElse (10_GB)) {
197+ ReportError (Ctx_, node, TStringBuilder () << " too big join input: " << dataSize);
198+ return false ;
199+ }
174200 }
175201 return !hasError;
176202 }
177203private:
178204
179205 const TTypeAnnotationContext& TypeCtx_;
180206 TExprContext& Ctx_;
181- TNodeSet Visited_;
182- THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_;
183- size_t DataSize_ = 0 ;
184207 const TDqState::TPtr State_;
208+ const bool CheckSelfMapJoin_;
185209
210+ TNodeSet Visited_;
211+ THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_;
212+ bool HasMapJoin_ = false ;
186213};
187214}
188215
0 commit comments