Skip to content

Added OverridePlanner validation #19945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 94 additions & 23 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,93 @@ std::optional<std::pair<TString, TString>> FindOneSecureParam(const TExprNode::T
return *secureParams.begin();
}

TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpProto::TKqpPhyQuery& queryProto) {
TIssues issues;
NJson::TJsonValue jsonNode;
try {
NJson::TJsonReaderConfig jsonConfig;
NJson::ReadJsonTree(overridePlannerJson, &jsonConfig, &jsonNode, true);
if (!jsonNode.IsArray()) {
issues.AddIssue("Expected array json value");
return issues;
}
} catch (const std::exception& e) {
issues.AddIssue(TStringBuilder() << "Failed to parse json: " << e.what());
return issues;
}

const auto extractUint = [](const NJson::TJsonValue& node, ui32* result) -> TString {
if (!node.IsUInteger()) {
return "Expected non negative integer json value";
}

*result = node.GetUIntegerSafe();
return "";
};

THashSet<std::pair<ui32, ui32>> updatedStages;
const auto& jsonArray = jsonNode.GetArray();
for (size_t i = 0; i < jsonArray.size(); ++i) {
const auto& stageOverride = jsonArray[i];
if (!stageOverride.IsMap()) {
issues.AddIssue(TStringBuilder() << "Expected map json value for stage override " << i);
continue;
}

ui32 txId = 0;
ui32 stageId = 0;
std::optional<ui32> tasks;
for (const auto& [key, value] : stageOverride.GetMap()) {
ui32* result = nullptr;
if (key == "tx") {
result = &txId;
} else if (key == "stage") {
result = &stageId;
} else if (key == "tasks") {
tasks = 0;
result = &(*tasks);
} else {
issues.AddIssue(TStringBuilder() << "Unknown key '" << key << "' in stage override " << i);
continue;
}

if (const auto& error = extractUint(value, result)) {
issues.AddIssue(TStringBuilder() << error << " for key '" << key << "' in stage override " << i);
continue;
}
}

if (!updatedStages.emplace(txId, stageId).second) {
issues.AddIssue(TStringBuilder() << "Duplicate stage override " << i << " for tx " << txId << " and stage " << stageId);
continue;
}

if (!tasks) {
issues.AddIssue(TStringBuilder() << "Missing stage settings for tx " << txId << " and stage " << stageId << " in stage override " << i);
continue;
}

auto& txs = *queryProto.MutableTransactions();
if (txId >= static_cast<ui32>(txs.size())) {
issues.AddIssue(TStringBuilder() << "Invalid tx id: " << txId << " in stage override " << i << ", number of transactions in query: " << txs.size());
continue;
}

auto& stages = *txs[txId].MutableStages();
if (stageId >= static_cast<ui32>(stages.size())) {
issues.AddIssue(TStringBuilder() << "Invalid stage id: " << stageId << " in stage override " << i << ", number of stages in transaction " << txId << ": " << stages.size());
continue;
}

auto& stage = stages[stageId];
if (tasks) {
stage.SetTaskCount(*tasks);
}
}

return issues;
}

class TKqpQueryCompiler : public IKqpQueryCompiler {
public:
TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
Expand Down Expand Up @@ -545,30 +632,14 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
CompileTransaction(tx, *queryProto.AddTransactions(), ctx);
}

auto overridePlanner = Config->OverridePlanner.Get();
if (overridePlanner) {
NJson::TJsonReaderConfig jsonConfig;
NJson::TJsonValue jsonNode;
if (NJson::ReadJsonTree(*overridePlanner, &jsonConfig, &jsonNode)) {
for (auto& stageOverride : jsonNode.GetArray()) {
ui32 txId = 0;
if (auto* txNode = stageOverride.GetValueByPath("tx")) {
txId = txNode->GetIntegerSafe();
}
if (txId < static_cast<ui32>(queryProto.GetTransactions().size())) {
auto& tx = *queryProto.MutableTransactions(txId);
ui32 stageId = 0;
if (auto* stageNode = stageOverride.GetValueByPath("stage")) {
stageId = stageNode->GetIntegerSafe();
}
if (stageId < static_cast<ui32>(tx.GetStages().size())) {
auto& stage = *tx.MutableStages(stageId);
if (auto* tasksNode = stageOverride.GetValueByPath("tasks")) {
stage.SetTaskCount(tasksNode->GetIntegerSafe());
}
}
}
if (const auto overridePlanner = Config->OverridePlanner.Get()) {
if (const auto& issues = ApplyOverridePlannerSettings(*overridePlanner, queryProto)) {
NYql::TIssue rootIssue("Invalid override planner settings");
rootIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
for (auto issue : issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO)));
}
ctx.AddError(rootIssue);
}
}

Expand Down
Loading