Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions ydb/library/yql/core/services/yql_eval_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class TMarkReachable {
pop = true;
}

if (node.IsCallable({ "EvaluateIf!", "EvaluateFor!" })) {
if (node.IsCallable({ "EvaluateIf!", "EvaluateFor!", "EvaluateParallelFor!" })) {
// scan predicate/list only
if (node.ChildrenSize() > 1) {
CurrentEvalNodes.insert(&node);
Expand Down Expand Up @@ -586,7 +586,8 @@ IGraphTransformer::TStatus EvaluateExpression(const TExprNode::TPtr& input, TExp
}
}

if (node->IsCallable("EvaluateFor!")) {
if (node->IsCallable({"EvaluateFor!", "EvaluateParallelFor!"})) {
const bool seq = node->IsCallable("EvaluateFor!");
if (!EnsureMinArgsCount(*node, 3, ctx)) {
return nullptr;
}
Expand Down Expand Up @@ -643,29 +644,37 @@ IGraphTransformer::TStatus EvaluateExpression(const TExprNode::TPtr& input, TExp
}

auto itemsCount = list->ChildrenSize() - (list->IsCallable("List") ? 1 : 0);
if (itemsCount > types.EvaluateForLimit) {
ctx.AddError(TIssue(ctx.GetPosition(list->Pos()), TStringBuilder() << "Too large list for EVALUATE FOR, allowed: " <<
types.EvaluateForLimit << ", got: " << itemsCount));
const auto limit = seq ? types.EvaluateForLimit : types.EvaluateParallelForLimit;
if (itemsCount > limit) {
ctx.AddError(TIssue(ctx.GetPosition(list->Pos()), TStringBuilder() << "Too large list for EVALUATE " << (seq ? "" : "PARALLEL ") << "FOR, allowed: " <<
limit << ", got: " << itemsCount));
return nullptr;
}

auto world = node->ChildPtr(0);
auto ret = ctx.Builder(node->Pos())
.Callable("Seq!")
.Add(0, world)
.Callable(seq ? "Seq!" : "Sync!")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
ui32 pos = 1;
ui32 pos = 0;
if (seq) {
parent.Add(pos++, world);
}

for (ui32 i = list->IsCallable("List") ? 1 : 0; i < list->ChildrenSize(); ++i) {
auto arg = ctx.NewArgument(node->Pos(), "world");
auto arg = seq ? ctx.NewArgument(node->Pos(), "world") : world;
auto body = ctx.Builder(node->Pos())
.Apply(node->ChildPtr(2))
.With(0, arg)
.With(1, list->ChildPtr(i))
.Seal()
.Build();

auto lambda = ctx.NewLambda(node->Pos(), ctx.NewArguments(node->Pos(), { arg }), std::move(body));
parent.Add(pos++, lambda);
if (seq) {
auto lambda = ctx.NewLambda(node->Pos(), ctx.NewArguments(node->Pos(), { arg }), std::move(body));
parent.Add(pos++, lambda);
} else {
parent.Add(pos++, body);
}
}

return parent;
Expand Down Expand Up @@ -779,9 +788,9 @@ IGraphTransformer::TStatus EvaluateExpression(const TExprNode::TPtr& input, TExp
}

auto itemsCount = list->ChildrenSize();
if (itemsCount > types.EvaluateForLimit) {
if (itemsCount > types.EvaluateParallelForLimit) {
ctx.AddError(TIssue(ctx.GetPosition(list->Pos()), TStringBuilder() << "Too large list for subquery loop, allowed: " <<
types.EvaluateForLimit << ", got: " << itemsCount));
types.EvaluateParallelForLimit << ", got: " << itemsCount));
return nullptr;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/yql_type_annotation.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
THashSet<TString> DisableConstraintCheck;
bool UdfSupportsYield = false;
ui32 EvaluateForLimit = 500;
ui32 EvaluateParallelForLimit = 5000;
ui32 EvaluateOrderByColumnLimit = 100;
bool PullUpFlatMapOverJoin = true;
bool DeprecatedSQL = false;
Expand Down
10 changes: 10 additions & 0 deletions ydb/library/yql/providers/config/yql_config_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,16 @@ namespace {
return false;
}
}
else if (name == "EvaluateParallelForLimit") {
if (args.size() != 1) {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected 1 argument, but got " << args.size()));
return false;
}
if (!TryFromString(args[0], Types.EvaluateParallelForLimit)) {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected integer, but got: " << args[0]));
return false;
}
}
else if (name == "DisablePullUpFlatMapOverJoin" || name == "PullUpFlatMapOverJoin") {
if (args.size() != 0) {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected no arguments, but got " << args.size()));
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/sql/v1/SQLv1.g.in
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN a
define_action_or_subquery_body: SEMICOLON* (sql_stmt_core (SEMICOLON+ sql_stmt_core)* SEMICOLON*)?;

if_stmt: EVALUATE? IF expr do_stmt (ELSE do_stmt)?;
for_stmt: EVALUATE? FOR bind_parameter IN expr do_stmt (ELSE do_stmt)?;
for_stmt: EVALUATE? PARALLEL? FOR bind_parameter IN expr do_stmt (ELSE do_stmt)?;

table_ref: (cluster_expr DOT)? AT? (table_key | an_id_expr LPAREN (table_arg (COMMA table_arg)* COMMA?)? RPAREN | bind_parameter (LPAREN expr_list? RPAREN)? (VIEW view_name)?) table_hints?;

Expand Down Expand Up @@ -1162,6 +1162,7 @@ keyword_as_compat:
| OMIT
| ONE
| OPTION
| PARALLEL
| PAST
| PATTERN
| PER
Expand Down Expand Up @@ -1315,6 +1316,7 @@ keyword_compat: (
| OTHERS
| OUTER
| OVER
| PARALLEL
| PARTITION
| PASSING
| PASSWORD
Expand Down Expand Up @@ -1663,6 +1665,7 @@ ORDER: O R D E R;
OTHERS: O T H E R S;
OUTER: O U T E R;
OVER: O V E R;
PARALLEL: P A R A L L E L;
PARTITION: P A R T I T I O N;
PASSING: P A S S I N G;
PASSWORD: P A S S W O R D;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/sql/v1/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ namespace NSQLTranslationV1 {
TMaybe<bool> CompactGroupBy;
bool BlockEngineEnable = false;
bool BlockEngineForce = false;
ui64 ParallelModeCount = 0;
};

class TColumnRefScope {
Expand Down
24 changes: 15 additions & 9 deletions ydb/library/yql/sql/v1/format/sql_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1254,29 +1254,35 @@ friend struct TStaticData;
void VisitFor(const TRule_for_stmt& msg) {
if (msg.HasBlock1()) {
PosFromToken(msg.GetBlock1().GetToken1());
} else if (msg.HasBlock2()) {
PosFromToken(msg.GetBlock2().GetToken1());
} else {
PosFromToken(msg.GetToken2());
PosFromToken(msg.GetToken3());
}

NewLine();
if (msg.HasBlock1()) {
Visit(msg.GetBlock1());
}

Visit(msg.GetToken2());
Visit(msg.GetRule_bind_parameter3());
Visit(msg.GetToken4());
Visit(msg.GetRule_expr5());
if (msg.HasBlock2()) {
Visit(msg.GetBlock2());
}

Visit(msg.GetToken3());
Visit(msg.GetRule_bind_parameter4());
Visit(msg.GetToken5());
Visit(msg.GetRule_expr6());
NewLine();
PushCurrentIndent();
Visit(msg.GetRule_do_stmt6());
Visit(msg.GetRule_do_stmt7());
PopCurrentIndent();
if (msg.HasBlock7()) {
if (msg.HasBlock8()) {
NewLine();
Visit(msg.GetBlock7().GetToken1());
Visit(msg.GetBlock8().GetToken1());
NewLine();
PushCurrentIndent();
Visit(msg.GetBlock7().GetRule_do_stmt2());
Visit(msg.GetBlock8().GetRule_do_stmt2());
PopCurrentIndent();
}
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/sql/v1/format/sql_format_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
"EVALUATE FOR $x IN []\n\tDO BEGIN\n\t\tSELECT\n\t\t\t$x;\n\tEND DO;\n"},
{"evaluate for $x in [] do begin select $x; end do else do begin select 2; end do",
"EVALUATE FOR $x IN []\n\tDO BEGIN\n\t\tSELECT\n\t\t\t$x;\n\tEND DO\nELSE\n\tDO BEGIN\n\t\tSELECT\n\t\t\t2;\n\tEND DO;\n"},
{"evaluate parallel for $x in [] do $a($x)",
"EVALUATE PARALLEL FOR $x IN []\n\tDO $a($x);\n"},
};

TSetup setup;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/sql/v1/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ namespace NSQLTranslationV1 {
TNodePtr BuildPragma(TPosition pos, const TString& prefix, const TString& name, const TVector<TDeferredAtom>& values, bool valueDefault);
TNodePtr BuildSqlLambda(TPosition pos, TVector<TString>&& args, TVector<TNodePtr>&& exprSeq);
TNodePtr BuildWorldIfNode(TPosition pos, TNodePtr predicate, TNodePtr thenNode, TNodePtr elseNode, bool isEvaluate);
TNodePtr BuildWorldForNode(TPosition pos, TNodePtr list, TNodePtr bodyNode, TNodePtr elseNode, bool isEvaluate);
TNodePtr BuildWorldForNode(TPosition pos, TNodePtr list, TNodePtr bodyNode, TNodePtr elseNode, bool isEvaluate, bool isParallel);

TNodePtr BuildCreateTopic(TPosition pos, const TTopicRef& tr, const TCreateTopicParameters& params,
TScopedStatePtr scoped);
Expand Down
10 changes: 6 additions & 4 deletions ydb/library/yql/sql/v1/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3010,12 +3010,13 @@ TNodePtr BuildWorldIfNode(TPosition pos, TNodePtr predicate, TNodePtr thenNode,

class TWorldFor final : public TAstListNode {
public:
TWorldFor(TPosition pos, TNodePtr list, TNodePtr bodyNode, TNodePtr elseNode, bool isEvaluate)
TWorldFor(TPosition pos, TNodePtr list, TNodePtr bodyNode, TNodePtr elseNode, bool isEvaluate, bool isParallel)
: TAstListNode(pos)
, List(list)
, BodyNode(bodyNode)
, ElseNode(elseNode)
, IsEvaluate(isEvaluate)
, IsParallel(isParallel)
{
FakeSource = BuildFakeSource(pos);
}
Expand All @@ -3024,7 +3025,7 @@ class TWorldFor final : public TAstListNode {
if (!List->Init(ctx, FakeSource.Get())) {
return{};
}
Add(IsEvaluate ? "EvaluateFor!" : "For!");
Add(TStringBuilder() << (IsEvaluate ? "Evaluate": "") << (IsParallel ? "Parallel" : "") << "For!");
Add("world");
Add(IsEvaluate ? Y("EvaluateExpr", List) : List);

Expand Down Expand Up @@ -3052,10 +3053,11 @@ class TWorldFor final : public TAstListNode {
TNodePtr BodyNode;
TNodePtr ElseNode;
bool IsEvaluate;
bool IsParallel;
TSourcePtr FakeSource;
};

TNodePtr BuildWorldForNode(TPosition pos, TNodePtr list, TNodePtr bodyNode, TNodePtr elseNode, bool isEvaluate) {
return new TWorldFor(pos, list, bodyNode, elseNode, isEvaluate);
TNodePtr BuildWorldForNode(TPosition pos, TNodePtr list, TNodePtr bodyNode, TNodePtr elseNode, bool isEvaluate, bool isParallel) {
return new TWorldFor(pos, list, bodyNode, elseNode, isEvaluate, isParallel);
}
} // namespace NSQLTranslationV1
10 changes: 10 additions & 0 deletions ydb/library/yql/sql/v1/sql_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore2: {
if (Ctx.ParallelModeCount > 0) {
Error() << humanStatementName << " statement is not supported in parallel mode";
return false;
}

Ctx.BodyPart();
TSqlSelect select(Ctx, Mode);
TPosition pos;
Expand Down Expand Up @@ -425,6 +430,11 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore21: {
if (Ctx.ParallelModeCount > 0) {
Error() << humanStatementName << " statement is not supported in parallel mode";
return false;
}

Ctx.BodyPart();
TSqlValues values(Ctx, Mode);
TPosition pos;
Expand Down
21 changes: 15 additions & 6 deletions ydb/library/yql/sql/v1/sql_translation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4196,34 +4196,43 @@ TNodePtr TSqlTranslation::IfStatement(const TRule_if_stmt& stmt) {

TNodePtr TSqlTranslation::ForStatement(const TRule_for_stmt& stmt) {
bool isEvaluate = stmt.HasBlock1();
bool isParallel = stmt.HasBlock2();
TSqlExpression expr(Ctx, Mode);
TString itemArgName;
if (!NamedNodeImpl(stmt.GetRule_bind_parameter3(), itemArgName, *this)) {
if (!NamedNodeImpl(stmt.GetRule_bind_parameter4(), itemArgName, *this)) {
return {};
}
TPosition itemArgNamePos = Ctx.Pos();

auto exprNode = expr.Build(stmt.GetRule_expr5());
auto exprNode = expr.Build(stmt.GetRule_expr6());
if (!exprNode) {
return{};
}

itemArgName = PushNamedAtom(itemArgNamePos, itemArgName);
auto bodyNode = DoStatement(stmt.GetRule_do_stmt6(), true, { itemArgName });
if (isParallel) {
++Ctx.ParallelModeCount;
}

auto bodyNode = DoStatement(stmt.GetRule_do_stmt7(), true, { itemArgName });
if (isParallel) {
--Ctx.ParallelModeCount;
}

PopNamedNode(itemArgName);
if (!bodyNode) {
return{};
}

TNodePtr elseNode;
if (stmt.HasBlock7()) {
elseNode = DoStatement(stmt.GetBlock7().GetRule_do_stmt2(), true);
if (stmt.HasBlock8()) {
elseNode = DoStatement(stmt.GetBlock8().GetRule_do_stmt2(), true);
if (!elseNode) {
return{};
}
}

return BuildWorldForNode(Ctx.Pos(), exprNode, bodyNode, elseNode, isEvaluate);
return BuildWorldForNode(Ctx.Pos(), exprNode, bodyNode, elseNode, isEvaluate, isParallel);
}

bool TSqlTranslation::BindParameterClause(const TRule_bind_parameter& node, TDeferredAtom& result) {
Expand Down
25 changes: 25 additions & 0 deletions ydb/library/yql/tests/sql/dq_file/part8/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@
}
],
"test.test[action-nested_eval-default.txt-Results]": [],
"test.test[action-parallel_for-default.txt-Analyze]": [
{
"checksum": "4a219d8f559a62f5a7a92c0e72dc36fd",
"size": 9893,
"uri": "https://{canondata_backend}/1942415/24f05a669ac876daf55df920342a0776d0748790/resource.tar.gz#test.test_action-parallel_for-default.txt-Analyze_/plan.txt"
},
{
"uri": "file://test.test_action-parallel_for-default.txt-Analyze_/extracted"
}
],
"test.test[action-parallel_for-default.txt-Debug]": [
{
"checksum": "d89d8c979433b118dc38d1383e73cfd4",
"size": 3368,
"uri": "https://{canondata_backend}/1942415/24f05a669ac876daf55df920342a0776d0748790/resource.tar.gz#test.test_action-parallel_for-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[action-parallel_for-default.txt-Plan]": [
{
"checksum": "7f07c14475e1efd8e6d1ec4128c3a930",
"size": 12723,
"uri": "https://{canondata_backend}/1942415/24f05a669ac876daf55df920342a0776d0748790/resource.tar.gz#test.test_action-parallel_for-default.txt-Plan_/plan.txt"
}
],
"test.test[action-parallel_for-default.txt-Results]": [],
"test.test[action-runtime_parse_type-default.txt-Analyze]": [
{
"checksum": "b4dd508a329723c74293d80f0278c705",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<tmp_path>/program.sql:<main>: Info: Optimization

<tmp_path>/program.sql:<main>: Info: DQ cannot execute the query. Cause: table without statistics
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@
"uri": "https://{canondata_backend}/1936273/85968a675c17dd0728c8d7ba5fd43bd0b237dc65/resource.tar.gz#test.test_action-insert_after_eval--Plan_/plan.txt"
}
],
"test.test[action-parallel_for-default.txt-Debug]": [
{
"checksum": "6171dfef0e867aef7c8fd5d4eaa3433f",
"size": 2524,
"uri": "https://{canondata_backend}/1937367/c7268da119a08337b1efcf8aaaaf126f474a8b6a/resource.tar.gz#test.test_action-parallel_for-default.txt-Debug_/opt.yql_patched"
}
],
"test.test[action-parallel_for-default.txt-Plan]": [
{
"checksum": "a10014163abc4b56652882ebff80c91d",
"size": 10137,
"uri": "https://{canondata_backend}/1937367/c7268da119a08337b1efcf8aaaaf126f474a8b6a/resource.tar.gz#test.test_action-parallel_for-default.txt-Plan_/plan.txt"
}
],
"test.test[action-subquery_orderby2-default.txt-Debug]": [
{
"checksum": "1179e3cd025108fe0031396c241345ee",
Expand Down
Loading