Skip to content

Commit 1efe749

Browse files
authored
Merge 78b259b into f2672d3
2 parents f2672d3 + 78b259b commit 1efe749

File tree

1 file changed

+94
-23
lines changed

1 file changed

+94
-23
lines changed

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,93 @@ std::optional<std::pair<TString, TString>> FindOneSecureParam(const TExprNode::T
493493
return *secureParams.begin();
494494
}
495495

496+
TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpProto::TKqpPhyQuery& queryProto) {
497+
TIssues issues;
498+
NJson::TJsonValue jsonNode;
499+
try {
500+
NJson::TJsonReaderConfig jsonConfig;
501+
NJson::ReadJsonTree(overridePlannerJson, &jsonConfig, &jsonNode, true);
502+
if (!jsonNode.IsArray()) {
503+
issues.AddIssue("Expected array json value");
504+
return issues;
505+
}
506+
} catch (const std::exception& e) {
507+
issues.AddIssue(TStringBuilder() << "Failed to parse json: " << e.what());
508+
return issues;
509+
}
510+
511+
const auto extractUint = [](const NJson::TJsonValue& node, ui32* result) -> TString {
512+
if (!node.IsUInteger()) {
513+
return "Expected non negative integer json value";
514+
}
515+
516+
*result = node.GetUIntegerSafe();
517+
return "";
518+
};
519+
520+
THashSet<std::pair<ui32, ui32>> updatedStages;
521+
const auto& jsonArray = jsonNode.GetArray();
522+
for (size_t i = 0; i < jsonArray.size(); ++i) {
523+
const auto& stageOverride = jsonArray[i];
524+
if (!stageOverride.IsMap()) {
525+
issues.AddIssue(TStringBuilder() << "Expected map json value for stage override " << i);
526+
continue;
527+
}
528+
529+
ui32 txId = 0;
530+
ui32 stageId = 0;
531+
std::optional<ui32> tasks;
532+
for (const auto& [key, value] : stageOverride.GetMap()) {
533+
ui32* result = nullptr;
534+
if (key == "tx") {
535+
result = &txId;
536+
} else if (key == "stage") {
537+
result = &stageId;
538+
} else if (key == "tasks") {
539+
tasks = 0;
540+
result = &(*tasks);
541+
} else {
542+
issues.AddIssue(TStringBuilder() << "Unknown key '" << key << "' in stage override " << i);
543+
continue;
544+
}
545+
546+
if (const auto& error = extractUint(value, result)) {
547+
issues.AddIssue(TStringBuilder() << error << " for key '" << key << "' in stage override " << i);
548+
continue;
549+
}
550+
}
551+
552+
if (!updatedStages.emplace(txId, stageId).second) {
553+
issues.AddIssue(TStringBuilder() << "Duplicate stage override " << i << " for tx " << txId << " and stage " << stageId);
554+
continue;
555+
}
556+
557+
if (!tasks) {
558+
issues.AddIssue(TStringBuilder() << "Missing stage settings for tx " << txId << " and stage " << stageId << " in stage override " << i);
559+
continue;
560+
}
561+
562+
auto& txs = *queryProto.MutableTransactions();
563+
if (txId >= static_cast<ui32>(txs.size())) {
564+
issues.AddIssue(TStringBuilder() << "Invalid tx id: " << txId << " in stage override " << i << ", number of transactions in query: " << txs.size());
565+
continue;
566+
}
567+
568+
auto& stages = *txs[txId].MutableStages();
569+
if (stageId >= static_cast<ui32>(stages.size())) {
570+
issues.AddIssue(TStringBuilder() << "Invalid stage id: " << stageId << " in stage override " << i << ", number of stages in transaction " << txId << ": " << stages.size());
571+
continue;
572+
}
573+
574+
auto& stage = stages[stageId];
575+
if (tasks) {
576+
stage.SetTaskCount(*tasks);
577+
}
578+
}
579+
580+
return issues;
581+
}
582+
496583
class TKqpQueryCompiler : public IKqpQueryCompiler {
497584
public:
498585
TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
@@ -545,30 +632,14 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
545632
CompileTransaction(tx, *queryProto.AddTransactions(), ctx);
546633
}
547634

548-
auto overridePlanner = Config->OverridePlanner.Get();
549-
if (overridePlanner) {
550-
NJson::TJsonReaderConfig jsonConfig;
551-
NJson::TJsonValue jsonNode;
552-
if (NJson::ReadJsonTree(*overridePlanner, &jsonConfig, &jsonNode)) {
553-
for (auto& stageOverride : jsonNode.GetArray()) {
554-
ui32 txId = 0;
555-
if (auto* txNode = stageOverride.GetValueByPath("tx")) {
556-
txId = txNode->GetIntegerSafe();
557-
}
558-
if (txId < static_cast<ui32>(queryProto.GetTransactions().size())) {
559-
auto& tx = *queryProto.MutableTransactions(txId);
560-
ui32 stageId = 0;
561-
if (auto* stageNode = stageOverride.GetValueByPath("stage")) {
562-
stageId = stageNode->GetIntegerSafe();
563-
}
564-
if (stageId < static_cast<ui32>(tx.GetStages().size())) {
565-
auto& stage = *tx.MutableStages(stageId);
566-
if (auto* tasksNode = stageOverride.GetValueByPath("tasks")) {
567-
stage.SetTaskCount(tasksNode->GetIntegerSafe());
568-
}
569-
}
570-
}
635+
if (const auto overridePlanner = Config->OverridePlanner.Get()) {
636+
if (const auto& issues = ApplyOverridePlannerSettings(*overridePlanner, queryProto)) {
637+
NYql::TIssue rootIssue("Invalid override planner settings");
638+
rootIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
639+
for (auto issue : issues) {
640+
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO)));
571641
}
642+
ctx.AddError(rootIssue);
572643
}
573644
}
574645

0 commit comments

Comments
 (0)