@@ -60,7 +60,6 @@ bool TKqpPlanner::UseMockEmptyPlanner = false;
60
60
// Task can allocate extra memory during execution.
61
61
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
62
62
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2 ;
63
- constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 8 ;
64
63
65
64
TKqpPlanner::TKqpPlanner (TKqpPlanner::TArgs&& args)
66
65
: TxId(args.TxId)
@@ -256,9 +255,18 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
256
255
257
256
auto localResources = ResourceManager_->GetLocalResources ();
258
257
Y_UNUSED (MEMORY_ESTIMATION_OVERFLOW);
258
+
259
+ auto placingOptions = ResourceManager_->GetPlacingOptions ();
260
+
261
+ bool singleNodeExecutionMakeSence = (
262
+ ResourceEstimations.size () <= placingOptions.MaxNonParallelTasksExecutionLimit ||
263
+ // all readers are located on the one node.
264
+ TasksPerNode.size () == 1
265
+ );
266
+
259
267
if (LocalRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory [NRm::EKqpMemoryPool::ScanQuery] &&
260
268
ResourceEstimations.size () <= localResources.ExecutionUnits &&
261
- ResourceEstimations. size () <= MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT )
269
+ singleNodeExecutionMakeSence )
262
270
{
263
271
ui64 selfNodeId = ExecuterId.NodeId ();
264
272
for (ui64 taskId: ComputeTasks) {
@@ -293,6 +301,41 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
293
301
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release ());
294
302
}
295
303
304
+ std::vector<ui64> deepestTasks;
305
+ ui64 maxLevel = 0 ;
306
+ for (auto & task: TasksGraph.GetTasks ()) {
307
+ // const auto& task = TasksGraph.GetTask(taskId);
308
+ const auto & stageInfo = TasksGraph.GetStageInfo (task.StageId );
309
+ const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta .GetStage (stageInfo.Id );
310
+ const ui64 stageLevel = stage.GetProgram ().GetSettings ().GetStageLevel ();
311
+
312
+ if (stageLevel > maxLevel) {
313
+ maxLevel = stageLevel;
314
+ deepestTasks.clear ();
315
+ }
316
+
317
+ if (stageLevel == maxLevel) {
318
+ deepestTasks.push_back (task.Id );
319
+ }
320
+ }
321
+
322
+ THashMap<ui64, ui64> alreadyAssigned;
323
+ for (auto & [nodeId, tasks] : TasksPerNode) {
324
+ for (ui64 taskId: tasks) {
325
+ alreadyAssigned.emplace (taskId, nodeId);
326
+ }
327
+ }
328
+
329
+ if (deepestTasks.size () <= placingOptions.MaxNonParallelTopStageExecutionLimit ) {
330
+ // looks like the merge / union all connection
331
+ for (ui64 taskId: deepestTasks) {
332
+ auto [it, success] = alreadyAssigned.emplace (taskId, ExecuterId.NodeId ());
333
+ if (success) {
334
+ TasksPerNode[ExecuterId.NodeId ()].push_back (taskId);
335
+ }
336
+ }
337
+ }
338
+
296
339
auto planner = (UseMockEmptyPlanner ? CreateKqpMockEmptyPlanner () : CreateKqpGreedyPlanner ()); // KqpMockEmptyPlanner is a mock planner for tests
297
340
298
341
auto ctx = TlsActivationContext->AsActorContext ();
@@ -309,13 +352,6 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
309
352
310
353
auto plan = planner->Plan (ResourcesSnapshot, ResourceEstimations);
311
354
312
- THashMap<ui64, ui64> alreadyAssigned;
313
- for (auto & [nodeId, tasks] : TasksPerNode) {
314
- for (ui64 taskId: tasks) {
315
- alreadyAssigned.emplace (taskId, nodeId);
316
- }
317
- }
318
-
319
355
if (!plan.empty ()) {
320
356
for (auto & group : plan) {
321
357
for (ui64 taskId: group.TaskIds ) {
0 commit comments