Skip to content

Commit ac286c8

Browse files
authored
[dq] Validate self join with map strategy (#4613)
1 parent e16ace1 commit ac286c8

File tree

6 files changed

+107
-35
lines changed

6 files changed

+107
-35
lines changed

ydb/library/yql/providers/dq/common/yql_dq_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ struct TDqSettings {
5959
static constexpr ui32 CostBasedOptimizationLevel = 0;
6060
static constexpr ui32 MaxDPccpDPTableSize = 16400U;
6161
static constexpr ui64 MaxAttachmentsSize = 2_GB;
62+
static constexpr bool SplitStageOnDqReplicate = true;
6263
};
6364

6465
using TPtr = std::shared_ptr<TDqSettings>;

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ class TLocalExecutor: public TCounters
125125
: State->RandomProvider;
126126

127127
TScopedAlloc alloc(
128-
__LOCATION__,
129-
NKikimr::TAlignedPagePoolCounters(),
128+
__LOCATION__,
129+
NKikimr::TAlignedPagePoolCounters(),
130130
State->FunctionRegistry->SupportsSizedAllocators(),
131131
false);
132132
NDq::TDqTaskRunnerContext executionContext;
@@ -287,7 +287,7 @@ struct TDqsPipelineConfigurator : public IPipelineConfigurator {
287287
}
288288
NDq::EChannelMode mode = GetConfiguredChannelMode(State_, typesCtx);
289289
pipeline->Add(
290-
NDq::CreateDqBuildPhyStagesTransformer(!State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(true), typesCtx, mode),
290+
NDq::CreateDqBuildPhyStagesTransformer(!State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate), typesCtx, mode),
291291
"BuildPhy");
292292
pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
293293
}

ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp

Lines changed: 59 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ class TDqExecutionValidator {
2525
ctx.AddError(YqlIssue(ctx.GetPosition(where.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, err));
2626
}
2727

28-
bool ValidateDqStage(const TExprNode& node) {
28+
bool ValidateDqStage(const TExprNode& node, TNodeSet* visitedStages) {
29+
if (visitedStages) {
30+
visitedStages->insert(&node);
31+
}
2932
if (!Visited_.insert(&node).second) {
3033
return true;
3134
}
@@ -36,11 +39,16 @@ class TDqExecutionValidator {
3639
ReportError(Ctx_, *bad, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage");
3740
}
3841

42+
43+
bool hasMapJoin = false;
3944
VisitExpr(TDqStageBase(&node).Program().Body().Ptr(),
4045
[](const TExprNode::TPtr& n) {
4146
return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get());
4247
},
43-
[&readPerProvider_ = ReadsPerProvider_, &hasErrors, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
48+
[&readPerProvider_ = ReadsPerProvider_, &hasErrors, &hasMapJoin, &ctx = Ctx_, &typeCtx = TypeCtx_](const TExprNode::TPtr& n) {
49+
if (TDqPhyMapJoin::Match(n.Get())) {
50+
hasMapJoin = true;
51+
}
4452
if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) {
4553
ReportError(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ");
4654
hasErrors = true;
@@ -60,28 +68,48 @@ class TDqExecutionValidator {
6068
}
6169
);
6270

63-
for (auto n: TDqStageBase(&node).Inputs()) {
64-
hasErrors |= !ValidateDqNode(n.Ref());
71+
HasMapJoin_ |= hasMapJoin;
72+
if (hasMapJoin && CheckSelfMapJoin_) {
73+
TNodeSet unitedVisitedStages;
74+
bool nonUniqStages = false;
75+
for (auto n: TDqStageBase(&node).Inputs()) {
76+
TNodeSet inputVisitedStages;
77+
hasErrors |= !ValidateDqNode(n.Ref(), &inputVisitedStages);
78+
const size_t expectedSize = unitedVisitedStages.size() + inputVisitedStages.size();
79+
unitedVisitedStages.insert(inputVisitedStages.begin(), inputVisitedStages.end());
80+
nonUniqStages |= (expectedSize != unitedVisitedStages.size()); // Found duplicates - some stage was visited twice from different inputs
81+
}
82+
if (nonUniqStages) {
83+
ReportError(Ctx_, node, TStringBuilder() << "Cannot execute self join using mapjoin strategy in DQ");
84+
hasErrors = true;
85+
}
86+
if (visitedStages) {
87+
visitedStages->insert(unitedVisitedStages.begin(), unitedVisitedStages.end());
88+
}
89+
} else {
90+
for (auto n: TDqStageBase(&node).Inputs()) {
91+
hasErrors |= !ValidateDqNode(n.Ref(), visitedStages);
92+
}
6593
}
6694

6795
if (auto outs = TDqStageBase(&node).Outputs()) {
6896
for (auto n: outs.Cast()) {
69-
hasErrors |= !ValidateDqNode(n.Ref());
97+
hasErrors |= !ValidateDqNode(n.Ref(), nullptr);
7098
}
7199
}
72100

73101
return !hasErrors;
74102

75103
}
76104

77-
bool ValidateDqNode(const TExprNode& node) {
105+
bool ValidateDqNode(const TExprNode& node, TNodeSet* visitedStages) {
78106
if (node.GetState() == TExprNode::EState::ExecutionComplete) {
79107
return true;
80108
}
81109

82110
if (TDqStageBase::Match(&node)) {
83111
// visited will be updated inside ValidateDqStage
84-
return ValidateDqStage(node);
112+
return ValidateDqStage(node, visitedStages);
85113
}
86114

87115
if (!Visited_.insert(&node).second) {
@@ -94,10 +122,10 @@ class TDqExecutionValidator {
94122
}
95123

96124
if (TDqConnection::Match(&node)) {
97-
return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref());
125+
return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref(), TDqCnValue::Match(&node) ? nullptr : visitedStages);
98126
}
99127
if (TDqPhyPrecompute::Match(&node)) {
100-
return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref());
128+
return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref(), nullptr);
101129
}
102130

103131
if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) {
@@ -113,14 +141,16 @@ class TDqExecutionValidator {
113141
: TypeCtx_(typeCtx)
114142
, Ctx_(ctx)
115143
, State_(state)
144+
, CheckSelfMapJoin_(!TypeCtx_.ForceDq
145+
&& !State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate)
146+
&& !State_->Settings->IsSpillingEnabled())
116147
{}
117148

118149
bool ValidateDqExecution(const TExprNode& node) {
119150
YQL_LOG_CTX_SCOPE(__FUNCTION__);
120151

121152
TNodeSet dqNodes;
122153

123-
bool hasJoin = false;
124154
if (TDqCnResult::Match(&node)) {
125155
dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw());
126156
} else if (TDqQuery::Match(&node)) {
@@ -142,47 +172,44 @@ class TDqExecutionValidator {
142172
});
143173
}
144174

145-
VisitExpr(node, [&hasJoin](const TExprNode& n) {
146-
if (TMaybeNode<TDqPhyMapJoin>(&n)) {
147-
hasJoin = true;
148-
}
149-
return true;
150-
});
151-
152175
bool hasError = false;
153176

154177
for (const auto n: dqNodes) {
155-
hasError |= !ValidateDqNode(*n);
178+
hasError |= !ValidateDqNode(*n, nullptr);
156179
if (hasError) {
157180
break;
158181
}
159182
}
160183

161-
for (auto& [integration, nodes]: ReadsPerProvider_) {
162-
TMaybe<ui64> size;
163-
hasError |= !(size = integration->EstimateReadSize(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob),
164-
State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_));
165-
if (hasError) {
166-
break;
184+
if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq) {
185+
size_t dataSize = 0;
186+
for (auto& [integration, nodes]: ReadsPerProvider_) {
187+
TMaybe<ui64> size;
188+
hasError |= !(size = integration->EstimateReadSize(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob),
189+
State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), nodes, Ctx_));
190+
if (hasError) {
191+
break;
192+
}
193+
dataSize += *size;
167194
}
168-
DataSize_ += *size;
169-
}
170195

171-
if (!hasError && hasJoin && DataSize_ > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) {
172-
ReportError(Ctx_, node, TStringBuilder() << "too big join input: " << DataSize_);
173-
return false;
196+
if (dataSize > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) {
197+
ReportError(Ctx_, node, TStringBuilder() << "too big join input: " << dataSize);
198+
return false;
199+
}
174200
}
175201
return !hasError;
176202
}
177203
private:
178204

179205
const TTypeAnnotationContext& TypeCtx_;
180206
TExprContext& Ctx_;
181-
TNodeSet Visited_;
182-
THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_;
183-
size_t DataSize_ = 0;
184207
const TDqState::TPtr State_;
208+
const bool CheckSelfMapJoin_;
185209

210+
TNodeSet Visited_;
211+
THashMap<IDqIntegration*, TVector<const TExprNode*>> ReadsPerProvider_;
212+
bool HasMapJoin_ = false;
186213
};
187214
}
188215

ydb/library/yql/tests/sql/dq_file/part3/canondata/result.json

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,6 +1347,37 @@
13471347
"uri": "https://{canondata_backend}/1871102/8fb53a3a81ad5d5949727846153c9f6f58a0845e/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-off-Results_/results.txt"
13481348
}
13491349
],
1350+
"test.test[join-selfjoin_on_sorted_with_filter-replicate-Analyze]": [
1351+
{
1352+
"checksum": "b59f1264995b5b377a58a392fc1d87c8",
1353+
"size": 3938,
1354+
"uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/plan.txt"
1355+
},
1356+
{
1357+
"uri": "file://test.test_join-selfjoin_on_sorted_with_filter-replicate-Analyze_/extracted"
1358+
}
1359+
],
1360+
"test.test[join-selfjoin_on_sorted_with_filter-replicate-Debug]": [
1361+
{
1362+
"checksum": "5d13bd670d234e8cc6261784c84e9012",
1363+
"size": 2195,
1364+
"uri": "https://{canondata_backend}/1784826/3baf99fc0c22227fef7f1b91df73370c2e22f014/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Debug_/opt.yql_patched"
1365+
}
1366+
],
1367+
"test.test[join-selfjoin_on_sorted_with_filter-replicate-Plan]": [
1368+
{
1369+
"checksum": "db2d64e1503f3bfa45bc79d2d1655935",
1370+
"size": 5087,
1371+
"uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Plan_/plan.txt"
1372+
}
1373+
],
1374+
"test.test[join-selfjoin_on_sorted_with_filter-replicate-Results]": [
1375+
{
1376+
"checksum": "568f3e7e0db9008acecc09f8942dd3c2",
1377+
"size": 3076,
1378+
"uri": "https://{canondata_backend}/1917492/064a3289ad6eaf99ba9f2a34e99fb15ca8194278/resource.tar.gz#test.test_join-selfjoin_on_sorted_with_filter-replicate-Results_/results.txt"
1379+
}
1380+
],
13501381
"test.test[join-three_equalities_paren--Analyze]": [
13511382
{
13521383
"checksum": "da428dcd6823eacaf44ce47e2c9951b9",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<tmp_path>/program.sql:<main>: Info: DQ cannot execute the query
2+
3+
<tmp_path>/program.sql:<main>: Info: Optimization
4+
5+
<tmp_path>/program.sql:<main>:7:22: Info: Cannot execute self join using mapjoin strategy in DQ
6+
select * from $in as a inner join $in as b on a.key = b.key;
7+
^
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
in Input sorted_uniq.txt
2+
3+
providers dq
4+
pragma dq.HashJoinMode="off";
5+
pragma dq.SplitStageOnDqReplicate="false";
6+
pragma dq.SpillingEngine="disable";

0 commit comments

Comments
 (0)