Skip to content

Account usage from ColumnShards #19981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ydb/core/base/appdata_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ namespace NKikimr {
namespace NJaegerTracing {
class TSamplingThrottlingConfigurator;
}
namespace NKqp::NScheduler {
class TComputeScheduler;
}
}

namespace NKikimrCms {
Expand Down Expand Up @@ -303,6 +306,8 @@ struct TAppData {
// Tracing configurator (look for tracing config in ydb/core/jaeger_tracing/actors_tracing_control)
TIntrusivePtr<NKikimr::NJaegerTracing::TSamplingThrottlingConfigurator> TracingConfigurator;

std::shared_ptr<NKqp::NScheduler::TComputeScheduler> ComputeScheduler;

TAppData(
ui32 sysPoolId, ui32 userPoolId, ui32 ioPoolId, ui32 batchPoolId,
TMap<TString, ui32> servicePools,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
MakeKqpCompileComputationPatternServiceID(SelfId().NodeId()), CompileComputationPatternService);
}

auto scheduler = std::make_shared<NScheduler::TComputeScheduler>(Counters);
auto scheduler = AppData()->ComputeScheduler = std::make_shared<NScheduler::TComputeScheduler>(Counters);

ResourceManager_ = GetKqpResourceManager();
CaFactory_ = NComputeActor::MakeKqpCaFactory(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/scheduler/new/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace NKikimr::NKqp::NScheduler {
}

struct TSchedulableTask;
using TSchedulableTaskPtr = THolder<TSchedulableTask>;
using TSchedulableTaskPtr = std::shared_ptr<TSchedulableTask>;
using TSchedulableTaskFactory = std::function<TSchedulableTaskPtr(const NHdrf::TQueryId&)>;

} // namespace NKikimr::NKqp::NScheduler
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ TComputeScheduler::TComputeScheduler(TIntrusivePtr<TKqpCounters> counters)

TSchedulableTaskFactory TComputeScheduler::CreateSchedulableTaskFactory() {
return [ptr = this->shared_from_this()](const NHdrf::TQueryId& queryId) {
return MakeHolder<TSchedulableTask>(ptr->GetQuery(queryId));
return std::make_shared<TSchedulableTask>(ptr->GetQuery(queryId));
};
}

Expand Down
13 changes: 13 additions & 0 deletions ydb/core/kqp/runtime/scheduler/new/kqp_schedulable_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ void TSchedulableTask::DecreaseUsage(const TDuration& burstUsage) {
}
}

void TSchedulableTask::IncreaseExtraUsage() {
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
++parent->UsageExtra;
}
}

void TSchedulableTask::DecreaseExtraUsage(const TDuration& burstUsageExtra) {
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
--parent->UsageExtra;
parent->BurstUsageExtra += burstUsageExtra.MicroSeconds();
}
}

void TSchedulableTask::IncreaseThrottle() {
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
++parent->Throttle;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/runtime/scheduler/new/kqp_schedulable_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ struct TSchedulableTask {
void IncreaseUsage(const TDuration& burstThrottle);
void DecreaseUsage(const TDuration& burstUsage);

// Account extra usage which doesn't affect scheduling
void IncreaseExtraUsage();
void DecreaseExtraUsage(const TDuration& burstUsage);

void IncreaseThrottle();
void DecreaseThrottle();

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/runtime/scheduler/new/tree/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {
NMonitoring::TDynamicCounters::TCounterPtr FairShare;
NMonitoring::TDynamicCounters::TCounterPtr InFlight;
NMonitoring::TDynamicCounters::TCounterPtr Waiting;

NMonitoring::TDynamicCounters::TCounterPtr InFlightExtra;
NMonitoring::TDynamicCounters::TCounterPtr UsageExtra;
};

} // namespace NKikimr::NKqp::NScheduler::NHdrf
3 changes: 3 additions & 0 deletions ydb/core/kqp/runtime/scheduler/new/tree/dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ TPool::TPool(const TString& id, const TIntrusivePtr<TKqpCounters>& counters, con
Counters.Usage = group->GetCounter("Usage", true);
Counters.Throttle = group->GetCounter("Throttle", true);
Counters.FairShare = group->GetCounter("FairShare", true); // snapshot

Counters.InFlightExtra = group->GetCounter("InFlightExtra", false);
Counters.UsageExtra = group->GetCounter("UsageExtra", true);
}

NSnapshot::TPool* TPool::TakeSnapshot() const {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/runtime/scheduler/new/tree/dynamic.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ namespace NKikimr::NKqp::NScheduler::NHdrf::NDynamic {

struct TTreeElementBase : public TStaticAttributes {
std::atomic<ui64> Usage = 0;
std::atomic<ui64> UsageExtra = 0;
std::atomic<ui64> Demand = 0;
std::atomic<ui64> Throttle = 0;

std::atomic<ui64> BurstUsage = 0;
std::atomic<ui64> BurstUsageExtra = 0;
std::atomic<ui64> BurstThrottle = 0;

TTreeElementBase* Parent = nullptr;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1213,11 +1213,13 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {

AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("stage", "finished");
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(
std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)), 0);
std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)), 0, {}); // TODO(scheduler)
return true;
}

void Complete(const TActorContext& /*ctx*/) override {
}

TTxType GetTxType() const override {
return TXTYPE_ASK_PORTION_METADATA;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/data_reader/fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class TPortionsDataFetcher: TNonCopyable {
if (IsFinishedFlag) {
return false;
}
NConveyorComposite::TServiceOperator::SendTaskToExecute(std::make_shared<TFetchingExecutor>(selfPtr), ConveyorCategory, 0);
NConveyorComposite::TServiceOperator::SendTaskToExecute(std::make_shared<TFetchingExecutor>(selfPtr), ConveyorCategory, 0, {}); // TODO(scheduler)
return true;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ TReadContext::TReadContext(const std::shared_ptr<IStoragesManager>& storagesMana
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId,
const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy,
const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits)
const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits,
NKqp::NScheduler::TSchedulableTaskPtr schedulableTask)
: StoragesManager(storagesManager)
, DataAccessorsManager(dataAccessorsManager)
, SchedulableTask(schedulableTask)
, Counters(counters)
, ReadMetadata(readMetadata)
, ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "read_metadata.h"

#include <ydb/core/kqp/runtime/scheduler/new/fwd.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
Expand Down Expand Up @@ -49,6 +50,7 @@ class TReadContext {
private:
YDB_READONLY_DEF(std::shared_ptr<IStoragesManager>, StoragesManager);
YDB_READONLY_DEF(std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>, DataAccessorsManager);
YDB_READONLY_DEF(NKqp::NScheduler::TSchedulableTaskPtr, SchedulableTask);
const NColumnShard::TConcreteScanCounters Counters;
TReadMetadataBase::TConstPtr ReadMetadata;
NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext;
Expand Down Expand Up @@ -150,7 +152,8 @@ class TReadContext {
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId,
const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy,
const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits);
const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits,
NKqp::NScheduler::TSchedulableTaskPtr schedulableTask);
};

class IDataReader {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId,
TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat,
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits)
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits,
NKqp::NScheduler::TSchedulableTaskPtr schedulableTask)
: StoragesManager(storagesManager)
, DataAccessorsManager(dataAccessorsManager)
, ColumnShardActorId(columnShardActorId)
Expand All @@ -34,6 +35,7 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
, DataFormat(dataFormat)
, TabletId(tabletId)
, CPULimits(cpuLimits)
, SchedulableTask(schedulableTask)
, ReadMetadataRange(readMetadataRange)
, Timeout(timeout ? timeout : COMPUTE_HARD_TIMEOUT)
, ScanCountersPool(scanCountersPool, TValidator::CheckNotNull(ReadMetadataRange)->GetProgram().GetGraphOptional())
Expand All @@ -54,7 +56,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));

std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, DataAccessorsManager, ScanCountersPool,
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits);
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits, SchedulableTask);
ScanIterator = ReadMetadataRange->StartScan(context);
auto startResult = ScanIterator->Start();
StartInstant = TMonotonic::Now();
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/reader/actor/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId,
TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat,
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits);
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits,
NKqp::NScheduler::TSchedulableTaskPtr schedulableTask);

void Bootstrap(const TActorContext& ctx);

Expand Down Expand Up @@ -131,6 +132,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>,
const NKikimrDataEvents::EDataFormat DataFormat;
const ui64 TabletId;
const NConveyorComposite::TCPULimitsConfig CPULimits;
NKqp::NScheduler::TSchedulableTaskPtr SchedulableTask;

TReadMetadataBase::TConstPtr ReadMetadataRange;
std::unique_ptr<TScanIteratorBase> ScanIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSu
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, Source->AddEvent("fbf"));
Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData()));
AFL_VERIFY(Step.Next());
auto task = std::make_shared<TStepAction>(Source, std::move(Step), Context->GetCommonContext()->GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, Context->GetCommonContext()->GetConveyorProcessId());
const auto& commonContext = *Context->GetCommonContext();
auto task = std::make_shared<TStepAction>(Source, std::move(Step), commonContext.GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
}

bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
Expand Down Expand Up @@ -58,8 +59,9 @@ void TColumnsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::N
for (auto&& i : DataFetchers) {
Source->MutableStageData().AddFetcher(i.second);
}
auto task = std::make_shared<TStepAction>(Source, std::move(Cursor), Source->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, Source->GetContext()->GetCommonContext()->GetConveyorProcessId());
const auto& commonContext = *Source->GetContext()->GetCommonContext();
auto task = std::make_shared<TStepAction>(Source, std::move(Cursor), commonContext.GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
} else {
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, Source->AddEvent("cf_next"));
std::shared_ptr<TColumnsFetcherTask> nextReadTask = std::make_shared<TColumnsFetcherTask>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr
Step.Next();
}
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, data->AddEvent("fmalloc"));
auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, data->GetContext()->GetCommonContext()->GetConveyorProcessId());
const auto& commonContext = *data->GetContext()->GetCommonContext();
auto task = std::make_shared<TStepAction>(data, std::move(Step), commonContext.GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ TConclusion<bool> TDetectInMem::DoExecuteInplace(const std::shared_ptr<IDataSour
TFetchingScriptCursor cursor(plan, 0);
const auto& commonContext = *source->GetContext()->GetCommonContext();
auto task = std::make_shared<TStepAction>(source, std::move(cursor), commonContext.GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId());
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ bool TBaseMergeTask::DoOnAllocated(
return false;
}
AllocationGuard = std::move(guard);
const auto& commonContext = *Context->GetCommonContext();
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(
static_pointer_cast<TBaseMergeTask>(allocation), Context->GetCommonContext()->GetConveyorProcessId());
static_pointer_cast<TBaseMergeTask>(allocation), commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::share
TFetchingScriptCursor cursor(FetchingPlan, 0);
const auto& commonContext = *GetContext()->GetCommonContext();
auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), commonContext.GetScanActorId(), true);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId());
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
}
}

Expand Down Expand Up @@ -175,7 +175,7 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
AFL_VERIFY(Step.Next());
const auto& commonContext = *Source->GetContext()->GetCommonContext();
auto task = std::make_shared<TStepAction>(Source, std::move(Step), commonContext.GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId());
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
}

public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ TConclusion<bool> TDetectInMem::DoExecuteInplace(const std::shared_ptr<IDataSour
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, source->AddEvent("sdmem"));
const auto& commonContext = *source->GetContext()->GetCommonContext();
auto task = std::make_shared<TStepAction>(source, std::move(cursor), commonContext.GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId());
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
return false;
}

Expand Down Expand Up @@ -186,7 +186,7 @@ TConclusion<bool> TPrepareResultStep::DoExecuteInplace(const std::shared_ptr<IDa
TFetchingScriptCursor cursor(plan, 0);
const auto& commonContext = *context->GetCommonContext();
auto task = std::make_shared<TStepAction>(source, std::move(cursor), commonContext.GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId());
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
return false;
} else {
return true;
Expand All @@ -204,8 +204,9 @@ void TDuplicateFilter::TFilterSubscriber::OnFilterReady(NArrow::TColumnFilter&&
}
source->MutableStageData().AddFilter(std::move(filter));
Step.Next();
auto task = std::make_shared<TStepAction>(source, std::move(Step), source->GetContext()->GetCommonContext()->GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, source->GetContext()->GetCommonContext()->GetConveyorProcessId());
const auto& commonContext = *source->GetContext()->GetCommonContext();
auto task = std::make_shared<TStepAction>(source, std::move(Step), commonContext.GetScanActorId(), false);
NConveyorComposite::TScanServiceOperator::SendTaskToExecute(task, commonContext.GetConveyorProcessId(), commonContext.GetSchedulableTask());
}
}

Expand Down
Loading
Loading