Skip to content

Commit 56e65a1

Browse files
committed
Account usage from ColumnShard scans in KQP scheduler
1 parent 575f48f commit 56e65a1

File tree

32 files changed

+123
-52
lines changed

32 files changed

+123
-52
lines changed

ydb/core/base/appdata_fwd.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ namespace NKikimr {
2525
namespace NJaegerTracing {
2626
class TSamplingThrottlingConfigurator;
2727
}
28+
namespace NKqp::NScheduler {
29+
class TComputeScheduler;
30+
}
2831
}
2932

3033
namespace NKikimrCms {
@@ -303,6 +306,8 @@ struct TAppData {
303306
// Tracing configurator (look for tracing config in ydb/core/jaeger_tracing/actors_tracing_control)
304307
TIntrusivePtr<NKikimr::NJaegerTracing::TSamplingThrottlingConfigurator> TracingConfigurator;
305308

309+
std::shared_ptr<NKqp::NScheduler::TComputeScheduler> ComputeScheduler;
310+
306311
TAppData(
307312
ui32 sysPoolId, ui32 userPoolId, ui32 ioPoolId, ui32 batchPoolId,
308313
TMap<TString, ui32> servicePools,

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
297297
MakeKqpCompileComputationPatternServiceID(SelfId().NodeId()), CompileComputationPatternService);
298298
}
299299

300-
auto scheduler = std::make_shared<NScheduler::TComputeScheduler>(Counters);
300+
auto scheduler = AppData()->ComputeScheduler = std::make_shared<NScheduler::TComputeScheduler>(Counters);
301301

302302
ResourceManager_ = GetKqpResourceManager();
303303
CaFactory_ = NComputeActor::MakeKqpCaFactory(

ydb/core/kqp/runtime/scheduler/new/fwd.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ namespace NKikimr::NKqp::NScheduler {
4343
}
4444

4545
struct TSchedulableTask;
46-
using TSchedulableTaskPtr = THolder<TSchedulableTask>;
46+
using TSchedulableTaskPtr = std::shared_ptr<TSchedulableTask>;
4747
using TSchedulableTaskFactory = std::function<TSchedulableTaskPtr(const NHdrf::TQueryId&)>;
4848

4949
} // namespace NKikimr::NKqp::NScheduler

ydb/core/kqp/runtime/scheduler/new/kqp_compute_scheduler_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ TComputeScheduler::TComputeScheduler(TIntrusivePtr<TKqpCounters> counters)
194194

195195
TSchedulableTaskFactory TComputeScheduler::CreateSchedulableTaskFactory() {
196196
return [ptr = this->shared_from_this()](const NHdrf::TQueryId& queryId) {
197-
return MakeHolder<TSchedulableTask>(ptr->GetQuery(queryId));
197+
return std::make_shared<TSchedulableTask>(ptr->GetQuery(queryId));
198198
};
199199
}
200200

ydb/core/kqp/runtime/scheduler/new/kqp_schedulable_actor.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,19 @@ void TSchedulableTask::DecreaseUsage(const TDuration& burstUsage) {
5757
}
5858
}
5959

60+
void TSchedulableTask::IncreaseExtraUsage() {
61+
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
62+
++parent->UsageExtra;
63+
}
64+
}
65+
66+
void TSchedulableTask::DecreaseExtraUsage(const TDuration& burstUsageExtra) {
67+
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
68+
--parent->UsageExtra;
69+
parent->BurstUsageExtra += burstUsageExtra.MicroSeconds();
70+
}
71+
}
72+
6073
void TSchedulableTask::IncreaseThrottle() {
6174
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
6275
++parent->Throttle;

ydb/core/kqp/runtime/scheduler/new/kqp_schedulable_actor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ struct TSchedulableTask {
1818
void IncreaseUsage(const TDuration& burstThrottle);
1919
void DecreaseUsage(const TDuration& burstUsage);
2020

21+
// Account extra usage which doesn't affect scheduling
22+
void IncreaseExtraUsage();
23+
void DecreaseExtraUsage(const TDuration& burstUsage);
24+
2125
void IncreaseThrottle();
2226
void DecreaseThrottle();
2327

ydb/core/kqp/runtime/scheduler/new/tree/common.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {
5151
NMonitoring::TDynamicCounters::TCounterPtr FairShare;
5252
NMonitoring::TDynamicCounters::TCounterPtr InFlight;
5353
NMonitoring::TDynamicCounters::TCounterPtr Waiting;
54+
55+
NMonitoring::TDynamicCounters::TCounterPtr InFlightExtra;
56+
NMonitoring::TDynamicCounters::TCounterPtr UsageExtra;
5457
};
5558

5659
} // namespace NKikimr::NKqp::NScheduler::NHdrf

ydb/core/kqp/runtime/scheduler/new/tree/dynamic.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ TPool::TPool(const TString& id, const TIntrusivePtr<TKqpCounters>& counters, con
7575
Counters.Usage = group->GetCounter("Usage", true);
7676
Counters.Throttle = group->GetCounter("Throttle", true);
7777
Counters.FairShare = group->GetCounter("FairShare", true); // snapshot
78+
79+
Counters.InFlightExtra = group->GetCounter("InFlightExtra", false);
80+
Counters.UsageExtra = group->GetCounter("UsageExtra", true);
7881
}
7982

8083
NSnapshot::TPool* TPool::TakeSnapshot() const {

ydb/core/kqp/runtime/scheduler/new/tree/dynamic.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ namespace NKikimr::NKqp::NScheduler::NHdrf::NDynamic {
3434

3535
struct TTreeElementBase : public TStaticAttributes {
3636
std::atomic<ui64> Usage = 0;
37+
std::atomic<ui64> UsageExtra = 0;
3738
std::atomic<ui64> Demand = 0;
3839
std::atomic<ui64> Throttle = 0;
3940

4041
std::atomic<ui64> BurstUsage = 0;
42+
std::atomic<ui64> BurstUsageExtra = 0;
4143
std::atomic<ui64> BurstThrottle = 0;
4244

4345
TTreeElementBase* Parent = nullptr;

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1213,11 +1213,13 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12131213

12141214
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("stage", "finished");
12151215
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(
1216-
std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)), 0);
1216+
std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)), 0, {}); // TODO(scheduler)
12171217
return true;
12181218
}
1219+
12191220
void Complete(const TActorContext& /*ctx*/) override {
12201221
}
1222+
12211223
TTxType GetTxType() const override {
12221224
return TXTYPE_ASK_PORTION_METADATA;
12231225
}

0 commit comments

Comments
 (0)