Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,33 +343,58 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
planner->SetLogFunc([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
}

THashMap<ui64, size_t> nodeIdtoIdx;
for (size_t idx = 0; idx < ResourcesSnapshot.size(); ++idx) {
nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx;
}

LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });

auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);
ui64 selfNodeId = ExecuterId.NodeId();
TString selfNodeDC;

if (!plan.empty()) {
for (auto& group : plan) {
for(ui64 taskId: group.TaskIds) {
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
if (success) {
TasksPerNode[group.NodeId].push_back(taskId);
}
}
TVector<const NKikimrKqp::TKqpNodeResources*> allNodes;
TVector<const NKikimrKqp::TKqpNodeResources*> executerDcNodes;
allNodes.reserve(ResourcesSnapshot.size());

for(auto& snapNode: ResourcesSnapshot) {
const TString& dc = snapNode.GetKqpProxyNodeResources().GetDataCenterId();
if (snapNode.GetNodeId() == selfNodeId) {
selfNodeDC = dc;
break;
}
}

return nullptr;
} else {
for(auto& snapNode: ResourcesSnapshot) {
allNodes.push_back(&snapNode);
if (selfNodeDC == snapNode.GetKqpProxyNodeResources().GetDataCenterId()) {
executerDcNodes.push_back(&snapNode);
}
}

TVector<IKqpPlannerStrategy::TResult> plan;

if (!executerDcNodes.empty() && placingOptions.PreferLocalDatacenterExecution) {
plan = planner->Plan(executerDcNodes, ResourceEstimations);
}

if (plan.empty()) {
plan = planner->Plan(allNodes, ResourceEstimations);
}

if (plan.empty()) {
LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E(msg); });

auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
TStringBuilder() << "Not enough resources to execute query. " << "TraceId: " << UserRequestContext->TraceId);
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
}

for (auto& group : plan) {
for(ui64 taskId: group.TaskIds) {
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
if (success) {
TasksPerNode[group.NodeId].push_back(taskId);
}
}
}

return nullptr;
}

const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const {
Expand Down
16 changes: 8 additions & 8 deletions ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ class TNodesManager {
return result;
}

TNodesManager(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources) {
TNodesManager(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources) {
for (auto& node : nodeResources) {
if (!node.GetAvailableComputeActors()) {
if (!node->GetAvailableComputeActors()) {
continue;
}
Nodes.emplace_back(TNodeDesc{
node.GetNodeId(),
ActorIdFromProto(node.GetResourceManagerActorId()),
node.GetTotalMemory() - node.GetUsedMemory(),
node.GetAvailableComputeActors(),
node->GetNodeId(),
ActorIdFromProto(node->GetResourceManagerActorId()),
node->GetTotalMemory() - node->GetUsedMemory(),
node->GetAvailableComputeActors(),
{}
});
}
Expand All @@ -111,7 +111,7 @@ class TKqpGreedyPlanner : public IKqpPlannerStrategy {
public:
~TKqpGreedyPlanner() override {}

TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
const TVector<TTaskResourceEstimation>& tasks) override
{
TVector<TResult> result;
Expand Down Expand Up @@ -161,7 +161,7 @@ class TKqpMockEmptyPlanner : public IKqpPlannerStrategy {
public:
~TKqpMockEmptyPlanner() override {}

TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>&,
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>&,
const TVector<TTaskResourceEstimation>&) override
{
return {};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class IKqpPlannerStrategy {
TVector<ui64> TaskIds;
};

virtual TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
virtual TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
const TVector<TTaskResourceEstimation>& estimatedResources) = 0;

protected:
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class TKqpResourceManager : public IKqpResourceManager {
return TPlannerPlacingOptions{
.MaxNonParallelTasksExecutionLimit = MaxNonParallelTasksExecutionLimit.load(),
.MaxNonParallelTopStageExecutionLimit = MaxNonParallelTopStageExecutionLimit.load(),
.PreferLocalDatacenterExecution = PreferLocalDatacenterExecution.load(),
};
}

Expand Down Expand Up @@ -423,6 +424,7 @@ class TKqpResourceManager : public IKqpResourceManager {
QueryMemoryLimit.store(config.GetQueryMemoryLimit());
MaxNonParallelTopStageExecutionLimit.store(config.GetMaxNonParallelTopStageExecutionLimit());
MaxNonParallelTasksExecutionLimit.store(config.GetMaxNonParallelTasksExecutionLimit());
PreferLocalDatacenterExecution.store(config.GetPreferLocalDatacenterExecution());
}

ui32 GetNodeId() override {
Expand Down Expand Up @@ -471,6 +473,7 @@ class TKqpResourceManager : public IKqpResourceManager {
std::atomic<i64> ExternalDataQueryMemory = 0;
std::atomic<ui64> MaxNonParallelTopStageExecutionLimit = 1;
std::atomic<ui64> MaxNonParallelTasksExecutionLimit = 8;
std::atomic<bool> PreferLocalDatacenterExecution = true;

// current state
std::atomic<ui64> LastResourceBrokerTaskId = 0;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ struct TKqpLocalNodeResources {
struct TPlannerPlacingOptions {
ui64 MaxNonParallelTasksExecutionLimit = 8;
ui64 MaxNonParallelTopStageExecutionLimit = 1;
bool PreferLocalDatacenterExecution = true;
};

/// per node singleton with instant API
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message TTableServiceConfig {

optional uint64 MaxNonParallelTasksExecutionLimit = 25 [default = 8];
optional uint64 MaxNonParallelTopStageExecutionLimit = 26 [default = 1];
optional bool PreferLocalDatacenterExecution = 27 [ default = true ];
}

message TSpillingServiceConfig {
Expand Down
Loading