@@ -343,33 +343,58 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
343
343
planner->SetLogFunc ([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D (msg); });
344
344
}
345
345
346
- THashMap<ui64, size_t > nodeIdtoIdx;
347
- for (size_t idx = 0 ; idx < ResourcesSnapshot.size (); ++idx) {
348
- nodeIdtoIdx[ResourcesSnapshot[idx].nodeid ()] = idx;
349
- }
350
-
351
346
LogMemoryStatistics ([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D (msg); });
352
347
353
- auto plan = planner->Plan (ResourcesSnapshot, ResourceEstimations);
348
+ ui64 selfNodeId = ExecuterId.NodeId ();
349
+ TString selfNodeDC;
354
350
355
- if (!plan.empty ()) {
356
- for (auto & group : plan) {
357
- for (ui64 taskId: group.TaskIds ) {
358
- auto [it, success] = alreadyAssigned.emplace (taskId, group.NodeId );
359
- if (success) {
360
- TasksPerNode[group.NodeId ].push_back (taskId);
361
- }
362
- }
351
+ TVector<const NKikimrKqp::TKqpNodeResources*> allNodes;
352
+ TVector<const NKikimrKqp::TKqpNodeResources*> executerDcNodes;
353
+ allNodes.reserve (ResourcesSnapshot.size ());
354
+
355
+ for (auto & snapNode: ResourcesSnapshot) {
356
+ const TString& dc = snapNode.GetKqpProxyNodeResources ().GetDataCenterId ();
357
+ if (snapNode.GetNodeId () == selfNodeId) {
358
+ selfNodeDC = dc;
359
+ break ;
363
360
}
361
+ }
364
362
365
- return nullptr ;
366
- } else {
363
+ for (auto & snapNode: ResourcesSnapshot) {
364
+ allNodes.push_back (&snapNode);
365
+ if (selfNodeDC == snapNode.GetKqpProxyNodeResources ().GetDataCenterId ()) {
366
+ executerDcNodes.push_back (&snapNode);
367
+ }
368
+ }
369
+
370
+ TVector<IKqpPlannerStrategy::TResult> plan;
371
+
372
+ if (!executerDcNodes.empty () && placingOptions.PreferLocalDatacenterExecution ) {
373
+ plan = planner->Plan (executerDcNodes, ResourceEstimations);
374
+ }
375
+
376
+ if (plan.empty ()) {
377
+ plan = planner->Plan (allNodes, ResourceEstimations);
378
+ }
379
+
380
+ if (plan.empty ()) {
367
381
LogMemoryStatistics ([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E (msg); });
368
382
369
383
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
370
384
TStringBuilder () << " Not enough resources to execute query. " << " TraceId: " << UserRequestContext->TraceId );
371
385
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release ());
372
386
}
387
+
388
+ for (auto & group : plan) {
389
+ for (ui64 taskId: group.TaskIds ) {
390
+ auto [it, success] = alreadyAssigned.emplace (taskId, group.NodeId );
391
+ if (success) {
392
+ TasksPerNode[group.NodeId ].push_back (taskId);
393
+ }
394
+ }
395
+ }
396
+
397
+ return nullptr ;
373
398
}
374
399
375
400
const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot () const {
0 commit comments