Skip to content

Commit fada176

Browse files
authored
Merge dd57ad1 into 2f366dc
2 parents 2f366dc + dd57ad1 commit fada176

File tree

6 files changed

+74
-29
lines changed

6 files changed

+74
-29
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2462,7 +2462,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24622462

24632463
const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty();
24642464
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
2465-
const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());
2465+
const bool localComputeTasks = !DatashardTxs.empty();
2466+
const bool mayRunTasksLocally = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());
24662467

24672468
Planner = CreateKqpPlanner({
24682469
.TasksGraph = TasksGraph,
@@ -2486,7 +2487,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24862487
.UserRequestContext = GetUserRequestContext(),
24872488
.FederatedQuerySetup = FederatedQuerySetup,
24882489
.OutputChunkMaxSize = Request.OutputChunkMaxSize,
2489-
.GUCSettings = GUCSettings
2490+
.GUCSettings = GUCSettings,
2491+
.MayRunTasksLocally = mayRunTasksLocally
24902492
});
24912493

24922494
auto err = Planner->PlanExecution();

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ bool TKqpPlanner::UseMockEmptyPlanner = false;
5555
// Task can allocate extra memory during execution.
5656
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
5757
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
58-
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;
58+
constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 8;
5959

6060
TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
6161
: TxId(args.TxId)
@@ -80,6 +80,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
8080
, FederatedQuerySetup(args.FederatedQuerySetup)
8181
, OutputChunkMaxSize(args.OutputChunkMaxSize)
8282
, GUCSettings(std::move(args.GUCSettings))
83+
, MayRunTasksLocally(args.MayRunTasksLocally)
8384
{
8485
if (!Database) {
8586
// a piece of magic for tests
@@ -255,6 +256,10 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
255256
return nullptr;
256257
}
257258

259+
if (ResourcesSnapshot.empty()) {
260+
ResourcesSnapshot = std::move(GetKqpResourceManager()->GetClusterResources());
261+
}
262+
258263
if (ResourcesSnapshot.empty() || (ResourcesSnapshot.size() == 1 && ResourcesSnapshot[0].GetNodeId() == ExecuterId.NodeId())) {
259264
// try to run without memory overflow settings
260265
if (LocalRunMemoryEst <= localResources.Memory[NRm::EKqpMemoryPool::ScanQuery] &&
@@ -407,6 +412,8 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
407412

408413
nComputeTasks = ComputeTasks.size();
409414

415+
// explicit requirement to execute task on the same node because it has dependencies
416+
// on datashard tx.
410417
if (LocalComputeTasks) {
411418
bool shareMailbox = (ComputeTasks.size() <= 1);
412419
for (ui64 taskId : ComputeTasks) {
@@ -429,7 +436,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
429436
PendingComputeTasks.insert(taskId);
430437
}
431438

432-
for (auto& [shardId, tasks] : TasksPerNode) {
439+
for (auto& [nodeId, tasks] : TasksPerNode) {
433440
for (ui64 taskId : tasks) {
434441
PendingComputeTasks.insert(taskId);
435442
}
@@ -440,7 +447,23 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
440447
return err;
441448
}
442449

450+
if (MayRunTasksLocally) {
451+
// temporary flag until common ca factory is implemented.
452+
auto tasksOnNodeIt = TasksPerNode.find(ExecuterId.NodeId());
453+
if (tasksOnNodeIt != TasksPerNode.end()) {
454+
auto& tasks = tasksOnNodeIt->second;
455+
const bool shareMailbox = (tasks.size() <= 1);
456+
for (ui64 taskId: tasks) {
457+
ExecuteDataComputeTask(taskId, shareMailbox, /* optimizeProtoForLocalExecution = */ true);
458+
PendingComputeTasks.erase(taskId);
459+
}
460+
}
461+
}
462+
443463
for(auto& [nodeId, tasks] : TasksPerNode) {
464+
if (MayRunTasksLocally && ExecuterId.NodeId() == nodeId)
465+
continue;
466+
444467
SortUnique(tasks);
445468
auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
446469
request.SerializedRequest = SerializeRequest(request);

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class TKqpPlanner {
6363
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
6464
const ui64 OutputChunkMaxSize = 0;
6565
const TGUCSettings::TPtr GUCSettings;
66+
const bool MayRunTasksLocally = false;
6667
};
6768

6869
TKqpPlanner(TKqpPlanner::TArgs&& args);
@@ -104,7 +105,7 @@ class TKqpPlanner {
104105
const bool WithSpilling;
105106
const TMaybe<NKikimrKqp::TRlPath> RlPath;
106107
THashSet<ui32> TrackingNodes;
107-
const TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
108+
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
108109
NWilson::TSpan& ExecuterSpan;
109110
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig;
110111
ui64 LocalRunMemoryEst = 0;
@@ -126,6 +127,7 @@ class TKqpPlanner {
126127
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
127128
const ui64 OutputChunkMaxSize;
128129
const TGUCSettings::TPtr GUCSettings;
130+
const bool MayRunTasksLocally;
129131

130132
public:
131133
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,23 @@ class TKqpResourceManager : public IKqpResourceManager {
475475
FireResourcesPublishing();
476476
}
477477

478+
TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const override {
479+
TVector<NKikimrKqp::TKqpNodeResources> resources;
480+
Y_ABORT_UNLESS(PublishResourcesByExchanger);
481+
482+
if (PublishResourcesByExchanger) {
483+
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
484+
with_lock (ResourceSnapshotState->Lock) {
485+
infos = ResourceSnapshotState->Snapshot;
486+
}
487+
if (infos != nullptr) {
488+
resources = *infos;
489+
}
490+
}
491+
492+
return resources;
493+
}
494+
478495
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
479496
LOG_AS_D("Schedule Snapshot request");
480497
if (PublishResourcesByExchanger) {

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class IKqpResourceManager : private TNonCopyable {
9292

9393
virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0;
9494

95+
virtual TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const = 0;
9596
virtual TKqpLocalNodeResources GetLocalResources() const = 0;
9697
virtual NKikimrConfig::TTableServiceConfig::TResourceManager GetConfig() = 0;
9798

ydb/core/tx/datashard/datashard_ut_trace.cpp

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
8787
auto [runtime, server, sender] = TestCreateServer();
8888

8989
CreateShardedTable(server, sender, "/Root", "table-1", 1, false);
90-
90+
9191
TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
9292
TActorId uploaderId = runtime.Register(uploader, 0);
93-
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
93+
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
9494
runtime.SimulateSleep(TDuration::Seconds(10));
9595

9696
const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions();
@@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
129129
CheckTxHasDatashardUnits(propose, 3);
130130

131131
auto progress = tabletTxs[1];
132-
CheckTxHasWriteLog(progress);
132+
CheckTxHasWriteLog(progress);
133133
CheckTxHasDatashardUnits(progress, usesVolatileTxs ? 6 : 11);
134134
}
135135

@@ -166,12 +166,12 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
166166
Y_UNIT_TEST(TestTraceDistributedSelect) {
167167
auto [runtime, server, sender] = TestCreateServer();
168168
bool bTreeIndex = runtime.GetAppData().FeatureFlags.GetEnableLocalDBBtreeIndex();
169-
169+
170170
CreateShardedTable(server, sender, "/Root", "table-1", 1, false);
171-
171+
172172
TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
173173
TActorId uploaderId = runtime.Register(uploader, 0);
174-
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
174+
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
175175
runtime.SimulateSleep(TDuration::Seconds(10));
176176

177177
SplitTable(runtime, server, 5);
@@ -230,27 +230,27 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
230230
UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2);
231231

232232
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
233-
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
234-
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
233+
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
234+
", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
235235
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
236236
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) "
237237
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
238238
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
239239
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
240240
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
241241
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
242-
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
242+
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor), (RunTasks)])])";
243243

244244
if (bTreeIndex) { // no index nodes (levels = 0)
245245
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , (LiteralExecuter) "
246-
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) "
247-
", (RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
246+
", (DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) "
247+
", (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , (Datashard.Read "
248248
"-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) "
249249
", (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
250250
", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Read -> [(Tablet.Transaction "
251251
"-> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) "
252252
", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
253-
"-> [(Tablet.WriteLog.LogEntry)])])])])])])])";
253+
"-> [(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])";
254254
}
255255

256256
} else {
@@ -281,7 +281,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
281281
}
282282

283283
canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
284-
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
284+
", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , "
285285
"(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
286286
"(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
287287
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
@@ -290,10 +290,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
290290
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
291291
"(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
292292
"(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
293-
"[(Tablet.WriteLog.LogEntry)])])])])])";
293+
"[(Tablet.WriteLog.LogEntry)])])]) , (ComputeActor)])])";
294294
}
295-
296-
295+
296+
297297
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
298298
}
299299

@@ -345,13 +345,13 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
345345
UNIT_ASSERT_VALUES_EQUAL(dsReads.size(), 2);
346346

347347
std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , "
348-
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , "
349-
"(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
348+
"(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , "
349+
"(ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , "
350350
"(Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
351351
"(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])])"
352352
"]) , (Datashard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
353353
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
354-
"[(Tablet.WriteLog.LogEntry)])])])])])])])";
354+
"[(Tablet.WriteLog.LogEntry)])])])])]) , (ComputeActor) , (RunTasks)])])";
355355
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
356356
}
357357

@@ -363,7 +363,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
363363

364364
TFakeWilsonUploader *uploader = new TFakeWilsonUploader();
365365
TActorId uploaderId = runtime.Register(uploader, 0);
366-
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
366+
runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0);
367367
runtime.SimulateSleep(TDuration::Seconds(10));
368368

369369
NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095);
@@ -380,16 +380,16 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
380380
UNIT_ASSERT_VALUES_EQUAL(1, uploader->Traces.size());
381381

382382
TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second;
383-
383+
384384
auto wtSpan = trace.Root.BFSFindOne("Datashard.WriteTransaction");
385385
UNIT_ASSERT(wtSpan);
386-
386+
387387
auto tabletTxs = wtSpan->get().FindAll("Tablet.Transaction");
388388
UNIT_ASSERT_VALUES_EQUAL(1, tabletTxs.size());
389389
auto writeTx = tabletTxs[0];
390390

391-
CheckTxHasWriteLog(writeTx);
392-
CheckTxHasDatashardUnits(writeTx, 5);
391+
CheckTxHasWriteLog(writeTx);
392+
CheckTxHasDatashardUnits(writeTx, 5);
393393

394394
std::string canon = "(Datashard.WriteTransaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
395395
"[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "

0 commit comments

Comments
 (0)