Skip to content

Commit f41490e

Browse files
committed
Revert "relax local compute tasks (ydb-platform#4264)"
This reverts commit 240232b.
1 parent d102931 commit f41490e

File tree

6 files changed

+23
-47
lines changed

6 files changed

+23
-47
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2463,7 +2463,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24632463

24642464
const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty();
24652465
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
2466-
const bool localComputeTasks = !DatashardTxs.empty();
2466+
const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());
24672467

24682468
Planner = CreateKqpPlanner({
24692469
.TasksGraph = TasksGraph,

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,10 +255,6 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
255255
return nullptr;
256256
}
257257

258-
if (ResourcesSnapshot.empty()) {
259-
ResourcesSnapshot = std::move(GetKqpResourceManager()->GetClusterResources());
260-
}
261-
262258
if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) {
263259
// try to run without memory overflow settings
264260
if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
@@ -411,8 +407,6 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
411407

412408
nComputeTasks = ComputeTasks.size();
413409

414-
// explicit requirement to execute task on the same node because it has dependencies
415-
// on datashard tx.
416410
if (LocalComputeTasks) {
417411
bool shareMailbox = (ComputeTasks.size() <= 1);
418412
for (ui64 taskId : ComputeTasks) {

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class TKqpPlanner {
104104
const bool WithSpilling;
105105
const TMaybe<NKikimrKqp::TRlPath> RlPath;
106106
THashSet<ui32> TrackingNodes;
107-
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
107+
const TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
108108
NWilson::TSpan& ExecuterSpan;
109109
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig;
110110
ui64 LocalRunMemoryEst = 0;

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -599,23 +599,6 @@ class TKqpResourceManager : public IKqpResourceManager {
599599
FireResourcesPublishing();
600600
}
601601

602-
TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const override {
603-
TVector<NKikimrKqp::TKqpNodeResources> resources;
604-
Y_ABORT_UNLESS(PublishResourcesByExchanger);
605-
606-
if (PublishResourcesByExchanger) {
607-
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
608-
with_lock (ResourceSnapshotState->Lock) {
609-
infos = ResourceSnapshotState->Snapshot;
610-
}
611-
if (infos != nullptr) {
612-
resources = *infos;
613-
}
614-
}
615-
616-
return resources;
617-
}
618-
619602
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
620603
LOG_AS_D("Schedule Snapshot request");
621604
if (PublishResourcesByExchanger) {

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ class IKqpResourceManager : private TNonCopyable {
9191

9292
virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0;
9393

94-
virtual TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const = 0;
9594
virtual TKqpLocalNodeResources GetLocalResources() const = 0;
9695
virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0;
9796

ydb/core/tx/datashard/datashard_ut_trace.cpp

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
8787
auto [runtime, server, sender] = TestCreateServer();
8888

8989
CreateShardedTable(server, sender, "/Root", "table-1", 1, false);
90-
90+
9191
TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
9292
TActorId uploaderId = runtime.Register(uploader, 0);
93-
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
93+
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
9494
runtime.SimulateSleep(TDuration::Seconds(10));
9595

9696
const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions();
@@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
129129
CheckTxHasDatashardUnits(propose, 3);
130130

131131
auto progress = tabletTxs[1];
132-
CheckTxHasWriteLog(progress);
132+
CheckTxHasWriteLog(progress);
133133
CheckTxHasDatashardUnits(progress, usesVolatileTxs ? 6 : 11);
134134
}
135135

@@ -166,12 +166,12 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
166166
Y_UNIT_TEST(TestTraceDistributedSelect) {
167167
auto [runtime, server, sender] = TestCreateServer();
168168
bool bTreeIndex = runtime.GetAppData().FeatureFlags.GetEnableLocalDBBtreeIndex();
169-
169+
170170
CreateShardedTable(server, sender, "/Root", "table-1", 1, false);
171-
171+
172172
TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
173173
TActorId uploaderId = runtime.Register(uploader, 0);
174-
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
174+
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
175175
runtime.SimulateSleep(TDuration::Seconds(10));
176176

177177
SplitTable(runtime, server, 5);
@@ -230,7 +230,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
230230
UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2);
231231

232232
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
233-
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
233+
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
234234
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
235235
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
236236
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) "
@@ -239,18 +239,18 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
239239
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
240240
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
241241
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
242-
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";
242+
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
243243

244244
if (bTreeIndex) { // no index nodes (levels = 0)
245245
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
246-
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
246+
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
247247
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
248248
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
249249
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
250250
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
251251
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
252252
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
253-
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";
253+
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
254254
}
255255

256256
} else {
@@ -281,7 +281,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
281281
}
282282

283283
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
284-
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , "
284+
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
285285
"(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
286286
"(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
287287
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
@@ -290,10 +290,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
290290
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
291291
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
292292
"(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
293-
"[(Tablet.WriteLog.LogEntry)])])]) , (ComputeActor)])])";
293+
"[(Tablet.WriteLog.LogEntry)])])])])])";
294294
}
295-
296-
295+
296+
297297
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
298298
}
299299

@@ -345,13 +345,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
345345
UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2);
346346

347347
std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
348-
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , "
348+
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
349349
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
350350
"(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
351351
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
352352
"]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
353353
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
354-
"[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor)])])";
354+
"[(Tablet.WriteLog.LogEntry)])])])])])])])";
355355
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
356356
}
357357

@@ -363,7 +363,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
363363

364364
TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
365365
TActorId uploaderId = runtime.Register(uploader, 0);
366-
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
366+
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
367367
runtime.SimulateSleep(TDuration::Seconds(10));
368368

369369
NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095);
@@ -380,16 +380,16 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
380380
UNIT_ASSERT_VALUES_EQUAL(1, uploader->Traces.size());
381381

382382
TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second;
383-
383+
384384
auto wtSpan = trace.Root.BFSFindOne("Datashard.WriteTransaction");
385385
UNIT_ASSERT(wtSpan);
386-
386+
387387
auto tabletTxs = wtSpan->get().FindAll("Tablet.Transaction");
388388
UNIT_ASSERT_VALUES_EQUAL(1, tabletTxs.size());
389389
auto writeTx = tabletTxs[0];
390390

391-
CheckTxHasWriteLog(writeTx);
392-
CheckTxHasDatashardUnits(writeTx, 5);
391+
CheckTxHasWriteLog(writeTx);
392+
CheckTxHasDatashardUnits(writeTx, 5);
393393

394394
std::string canon = "(Datashard.WriteTransaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
395395
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "

0 commit comments

Comments
 (0)