Skip to content

Commit e4b8581

Browse files
authored
Merge cb8cfb3 into f5945a4
2 parents f5945a4 + cb8cfb3 commit e4b8581

File tree

5 files changed

+26
-2
lines changed

5 files changed

+26
-2
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2410,7 +2410,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24102410

24112411
const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty();
24122412
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
2413-
const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());
2413+
const bool localComputeTasks = !DatashardTxs.empty();
24142414

24152415
Planner = CreateKqpPlanner({
24162416
.TasksGraph = TasksGraph,

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
255255
return nullptr;
256256
}
257257

258+
if (ResourcesSnapshot.empty()) {
259+
ResourcesSnapshot = std::move(GetKqpResourceManager()->GetClusterResources());
260+
}
261+
258262
if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) {
259263
// try to run without memory overflow settings
260264
if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
@@ -407,6 +411,8 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
407411

408412
nComputeTasks = ComputeTasks.size();
409413

414+
// explicit requirement to execute task on the same node because it has dependencies
415+
// on datashard tx.
410416
if (LocalComputeTasks) {
411417
bool shareMailbox = (ComputeTasks.size() <= 1);
412418
for (ui64 taskId : ComputeTasks) {

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class TKqpPlanner {
104104
const bool WithSpilling;
105105
const TMaybe<NKikimrKqp::TRlPath> RlPath;
106106
THashSet<ui32> TrackingNodes;
107-
const TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
107+
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
108108
NWilson::TSpan& ExecuterSpan;
109109
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig;
110110
ui64 LocalRunMemoryEst = 0;

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,23 @@ class TKqpResourceManager : public IKqpResourceManager {
599599
FireResourcesPublishing();
600600
}
601601

602+
TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const override {
603+
TVector<NKikimrKqp::TKqpNodeResources> resources;
604+
Y_ABORT_UNLESS(PublishResourcesByExchanger);
605+
606+
if (PublishResourcesByExchanger) {
607+
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
608+
with_lock (ResourceSnapshotState->Lock) {
609+
infos = ResourceSnapshotState->Snapshot;
610+
}
611+
if (infos != nullptr) {
612+
resources = *infos;
613+
}
614+
}
615+
616+
return resources;
617+
}
618+
602619
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
603620
LOG_AS_D("Schedule Snapshot request");
604621
if (PublishResourcesByExchanger) {

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class IKqpResourceManager : private TNonCopyable {
9191

9292
virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0;
9393

94+
virtual TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const = 0;
9495
virtual TKqpLocalNodeResources GetLocalResources() const = 0;
9596
virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0;
9697

0 commit comments

Comments
 (0)