Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,10 @@ class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext
return {};
}

TIntrusivePtr<NDq::TSpillingTaskCounters> GetSpillingTaskCounters() const override {
return {};
}

NDq::TTxId GetTxId() const override {
return {};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
protoTask->SetWaitInputTimeUs(taskStats.WaitInputTime.MicroSeconds());
protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds());

protoTask->SetSpillingComputeWriteBytes(taskStats.SpillingComputeWriteBytes);
protoTask->SetSpillingChannelWriteBytes(taskStats.SpillingChannelWriteBytes);

protoTask->SetSpillingComputeReadTimeUs(taskStats.SpillingComputeReadTime.MicroSeconds());
protoTask->SetSpillingComputeWriteTimeUs(taskStats.SpillingComputeWriteTime.MicroSeconds());
protoTask->SetSpillingChannelReadTimeUs(taskStats.SpillingChannelReadTime.MicroSeconds());
protoTask->SetSpillingChannelWriteTimeUs(taskStats.SpillingChannelWriteTime.MicroSeconds());

if (StatsLevelCollectProfile(level)) {
if (taskStats.ComputeCpuTimeByRun) {
auto snapshot = taskStats.ComputeCpuTimeByRun->Snapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
if (!limits.OutputChunkMaxSize) {
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
}

TaskRunner->Prepare(this->Task, limits, execCtx);


if (this->Task.GetEnableSpilling()) {
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
}

TaskRunner->Prepare(this->Task, limits, execCtx);

for (auto& [channelId, channel] : this->InputChannelsMap) {
channel.Channel = TaskRunner->GetInputChannel(channelId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, TWakeUp
: TxId_(txId)
, WakeUpCallback_(std::move(wakeUpCallback))
, ErrorCallback_(std::move(errorCallback))
, SpillingTaskCounters_(MakeIntrusive<TSpillingTaskCounters>())
{
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling) const {
return CreateChannelStorage(channelId, withSpilling, NActors::TlsActivationContext->ActorSystem());
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const {
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const {
if (withSpilling) {
return CreateDqChannelStorage(TxId_, channelId, WakeUpCallback_, ErrorCallback_, actorSystem);
return CreateDqChannelStorage(TxId_, channelId, WakeUpCallback_, ErrorCallback_, SpillingTaskCounters_, actorSystem);
} else {
return nullptr;
}
Expand All @@ -33,6 +34,10 @@ TErrorCallback TDqTaskRunnerExecutionContext::GetErrorCallback() const {
return ErrorCallback_;
}

TIntrusivePtr<TSpillingTaskCounters> TDqTaskRunnerExecutionContext::GetSpillingTaskCounters() const {
return SpillingTaskCounters_;
}

TTxId TDqTaskRunnerExecutionContext::GetTxId() const {
return TxId_;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const override;

TWakeUpCallback GetWakeupCallback() const override;
TErrorCallback GetErrorCallback() const override;
TIntrusivePtr<TSpillingTaskCounters> GetSpillingTaskCounters() const override;
TTxId GetTxId() const override;

private:
const TTxId TxId_;
const TWakeUpCallback WakeUpCallback_;
const TErrorCallback ErrorCallback_;
const TIntrusivePtr<TSpillingTaskCounters> SpillingTaskCounters_;
};

} // namespace NDq
Expand Down
8 changes: 8 additions & 0 deletions ydb/library/yql/dq/actors/protos/dq_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ message TDqTaskStats {
repeated TDqOutputChannelStats OutputChannels = 153;
repeated TDqAsyncInputBufferStats InputTransforms = 155;

uint64 SpillingComputeWriteBytes = 160;
uint64 SpillingChannelWriteBytes = 161;

uint64 SpillingComputeReadTimeUs = 162;
uint64 SpillingComputeWriteTimeUs = 163;
uint64 SpillingChannelReadTimeUs = 164;
uint64 SpillingChannelWriteTimeUs = 165;

// profile stats
repeated TDqTableStats Tables = 10;

Expand Down
9 changes: 6 additions & 3 deletions ydb/library/yql/dq/actors/spilling/channel_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ class TDqChannelStorage : public IDqChannelStorage {
NThreading::TFuture<void> IsBlobWrittenFuture_;
};
public:
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, TActorSystem* actorSystem)
: ActorSystem_(actorSystem)
{
ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), spillingTaskCounters, actorSystem);
ChannelStorageActorId_ = ActorSystem_->Register(ChannelStorageActor_->GetActor());
}

Expand Down Expand Up @@ -119,12 +120,14 @@ class TDqChannelStorage : public IDqChannelStorage {

} // anonymous namespace


IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
TWakeUpCallback wakeUpCallback,
TErrorCallback errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters,
TActorSystem* actorSystem)
{
return new TDqChannelStorage(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
return new TDqChannelStorage(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), spillingTaskCounters, actorSystem);
}

} // namespace NYql::NDq
3 changes: 3 additions & 0 deletions ydb/library/yql/dq/actors/spilling/channel_storage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "spilling_counters.h"

#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/dq/runtime/dq_channel_storage.h>
#include <ydb/library/actors/core/actor.h>
Expand All @@ -13,6 +15,7 @@ namespace NYql::NDq {
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
TWakeUpCallback wakeUpCallback,
TErrorCallback errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters,
NActors::TActorSystem* actorSystem);

} // namespace NYql::NDq
54 changes: 45 additions & 9 deletions ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,26 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
public NActors::TActorBootstrapped<TDqChannelStorageActor>
{
using TBase = TActorBootstrapped<TDqChannelStorageActor>;

struct TWritingBlobInfo {
ui64 Size;
NThreading::TPromise<void> SavePromise;
TInstant OpBegin;
};

struct TLoadingBlobInfo {
NThreading::TPromise<TBuffer> BlobPromise;
TInstant OpBegin;
};
public:

TDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
TDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, TActorSystem* actorSystem)
: TxId_(txId)
, ChannelId_(channelId)
, WakeUpCallback_(std::move(wakeUpCallback))
, ErrorCallback_(std::move(errorCallback))
, SpillingTaskCounters_(spillingTaskCounters)
, ActorSystem_(actorSystem)
{}

Expand Down Expand Up @@ -101,8 +114,11 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
void HandleWork(TEvDqChannelSpilling::TEvGet::TPtr& ev) {
auto& msg = *ev->Get();
LOG_T("[TEvGet] blobId: " << msg.BlobId_);

auto opBegin = TInstant::Now();

LoadingBlobs_.emplace(msg.BlobId_, std::move(msg.Promise_));
auto loadingBlobInfo = TLoadingBlobInfo{std::move(msg.Promise_), opBegin};
LoadingBlobs_.emplace(msg.BlobId_, std::move(loadingBlobInfo));

SendInternal(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.BlobId_));
}
Expand All @@ -111,7 +127,10 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
auto& msg = *ev->Get();
LOG_T("[TEvPut] blobId: " << msg.BlobId_);

WritingBlobs_.emplace(msg.BlobId_, std::move(msg.Promise_));
auto opBegin = TInstant::Now();

auto writingBlobInfo = TWritingBlobInfo{msg.Blob_.size(), std::move(msg.Promise_), opBegin};
WritingBlobs_.emplace(msg.BlobId_, std::move(writingBlobInfo));

SendInternal(SpillingActorId_, new TEvDqSpilling::TEvWrite(msg.BlobId_, std::move(msg.Blob_)));
}
Expand All @@ -126,8 +145,15 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
return;
}

auto& blobInfo = it->second;

if (SpillingTaskCounters_) {
SpillingTaskCounters_->ChannelWriteBytes += blobInfo.Size;
auto opDuration = TInstant::Now() - blobInfo.OpBegin;
SpillingTaskCounters_->ChannelWriteTime += opDuration.MilliSeconds();
}
// Complete the future
it->second.SetValue();
blobInfo.SavePromise.SetValue();
WritingBlobs_.erase(it);

WakeUpCallback_();
Expand All @@ -143,7 +169,14 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
return;
}

it->second.SetValue(std::move(msg.Blob));
auto& blobInfo = it->second;

if (SpillingTaskCounters_) {
auto opDuration = TInstant::Now() - blobInfo.OpBegin;
SpillingTaskCounters_->ChannelReadTime += opDuration.MilliSeconds();
}

blobInfo.BlobPromise.SetValue(std::move(msg.Blob));
LoadingBlobs_.erase(it);

WakeUpCallback_();
Expand All @@ -163,15 +196,17 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
private:
const TTxId TxId_;
const ui64 ChannelId_;

TWakeUpCallback WakeUpCallback_;
TErrorCallback ErrorCallback_;
TIntrusivePtr<TSpillingTaskCounters> SpillingTaskCounters_;
TActorId SpillingActorId_;

// BlobId -> promise that blob is saved
std::unordered_map<ui64, NThreading::TPromise<void>> WritingBlobs_;
// BlobId -> blob size + promise that blob is saved
std::unordered_map<ui64, TWritingBlobInfo> WritingBlobs_;

// BlobId -> promise with requested blob
std::unordered_map<ui64, NThreading::TPromise<TBuffer>> LoadingBlobs_;
std::unordered_map<ui64, TLoadingBlobInfo> LoadingBlobs_;

TActorSystem* ActorSystem_;
};
Expand All @@ -181,9 +216,10 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId,
TWakeUpCallback&& wakeUpCallback,
TErrorCallback&& errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters,
NActors::TActorSystem* actorSystem)
{
return new TDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
return new TDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), spillingTaskCounters, actorSystem);
}

} // namespace NYql::NDq
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "spilling_counters.h"

#include <ydb/library/yql/dq/runtime/dq_channel_storage.h>
#include "ydb/library/yql/dq/common/dq_common.h"

Expand Down Expand Up @@ -49,6 +51,8 @@ class IDqChannelStorageActor
virtual NActors::IActor* GetActor() = 0;
};

IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, NActors::TActorSystem* actorSystem);

IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, NActors::TActorSystem* actorSystem);

} // namespace NYql::NDq
5 changes: 3 additions & 2 deletions ydb/library/yql/dq/actors/spilling/compute_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ namespace NYql::NDq {

using namespace NActors;

TDqComputeStorage::TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
TDqComputeStorage::TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
TStringStream spillerName;
spillerName << "Spiller" << "_" << CreateGuidAsString();
ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, errorCallback);
ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, errorCallback, spillingTaskCounters);
ComputeStorageActorId_ = ActorSystem_->Register(ComputeStorageActor_->GetActor());
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/actors/spilling/compute_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace NYql::NDq {
class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller
{
public:

TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, NActors::TActorSystem* actorSystem);
TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback,
TIntrusivePtr<TSpillingTaskCounters> spillingTaskCounters, NActors::TActorSystem* actorSystem);

~TDqComputeStorage();

Expand Down
Loading