Skip to content

Commit 881ce4a

Browse files
authored
Implement merge reads option (#9206)
1 parent 4cd0d35 commit 881ce4a

File tree

5 files changed

+56
-1
lines changed

5 files changed

+56
-1
lines changed

ydb/core/kqp/common/control.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#include "control.h"
2+
3+
#include <ydb/core/base/appdata_fwd.h>
4+
#include <ydb/core/control/immediate_control_board_impl.h>
5+
6+
namespace {
7+
8+
struct TControls {
9+
std::shared_ptr<NKikimr::TControlWrapper> MergeReads;
10+
11+
TControls() {
12+
if (auto *appData = NKikimr::AppData()) {
13+
if (appData->Icb) {
14+
MergeReads = std::make_shared<NKikimr::TControlWrapper>(0, 0, 1);
15+
appData->Icb->RegisterSharedControl(*MergeReads,
16+
"TableServiceControls.EnableMergeDatashardReads");
17+
}
18+
}
19+
20+
}
21+
};
22+
23+
}
24+
25+
namespace NKikimr::NKqp {
26+
27+
std::shared_ptr<TControlWrapper> MergeDatashardReadsControl() {
28+
return Singleton<TControls>()->MergeReads;
29+
}
30+
31+
} // namespace NKikimr::NKqp

ydb/core/kqp/common/control.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#include <ydb/core/control/immediate_control_board_wrapper.h>
2+
3+
namespace NKikimr::NKqp {
4+
5+
std::shared_ptr<TControlWrapper> MergeDatashardReadsControl();
6+
7+
} // namespace NKikimr::NKqp

ydb/core/kqp/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
LIBRARY()
22

33
SRCS(
4+
control.cpp
45
kqp_event_ids.h
56
kqp_event_impl.cpp
67
kqp_lwtrace_probes.cpp

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <ydb/core/kqp/opt/kqp_query_plan.h>
2929
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
3030
#include <ydb/core/grpc_services/local_rate_limiter.h>
31+
#include <ydb/core/kqp/common/control.h>
3132

3233
#include <ydb/services/metadata/secret/fetcher.h>
3334
#include <ydb/services/metadata/secret/snapshot.h>
@@ -146,6 +147,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
146147
, StreamResult(streamResult)
147148
, StatementResultIndex(statementResultIndex)
148149
{
150+
EnableReadsMerge = *MergeDatashardReadsControl() == 1;
149151
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
150152
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
151153
TasksGraph.GetMeta().Database = Database;
@@ -1084,7 +1086,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
10841086
return result;
10851087
}
10861088

1087-
TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const bool shardsResolved, const bool limitTasksPerNode) {
1089+
TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const bool shardsResolved, bool limitTasksPerNode) {
1090+
if (EnableReadsMerge) {
1091+
limitTasksPerNode = true;
1092+
}
1093+
10881094
THashMap<ui64, std::vector<ui64>> nodeTasks;
10891095
THashMap<ui64, ui64> assignedShardsCount;
10901096

@@ -2026,6 +2032,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
20262032

20272033
ui32 StatementResultIndex;
20282034
bool AlreadyReplied = false;
2035+
bool EnableReadsMerge = false;
20292036

20302037
private:
20312038
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);

ydb/core/protos/config.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,14 @@ message TImmediateControlsConfig {
14751475
DefaultValue: 0 }];
14761476
}
14771477

1478+
message TTableServiceControls {
1479+
optional uint64 EnableMergeDatashardReads = 1 [(ControlOptions) = {
1480+
Description: "Merge reading tasks on the same node",
1481+
MinValue: 0,
1482+
MaxValue: 1,
1483+
DefaultValue: 0 }];
1484+
}
1485+
14781486
optional TDataShardControls DataShardControls = 1;
14791487
optional TTxLimitControls TxLimitControls = 2;
14801488
optional TCoordinatorControls CoordinatorControls = 3;
@@ -1486,6 +1494,7 @@ message TImmediateControlsConfig {
14861494
optional TDSProxyControls DSProxyControls = 9;
14871495
optional TPDiskControls PDiskControls = 10;
14881496
optional TBlobStorageControllerControls BlobStorageControllerControls = 11;
1497+
optional TTableServiceControls TableServiceControls = 12;
14891498
};
14901499

14911500
message TMeteringConfig {

0 commit comments

Comments
 (0)