Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ using namespace NYql::NDq;

class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
, WithSpilling_(withSpilling)
{
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
SetTaskRunner(taskRunner);

auto wakeup = [this]{ ContinueExecute(); };
auto wakeupCallback = [this]{ ContinueExecute(); };
auto errorCallback = [this](const TString& error){ SendError(error); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ void TKqpScanComputeActor::DoBootstrap() {
TBase::SetTaskRunner(taskRunner);

auto wakeup = [this] { ContinueExecute(); };
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
auto errorCallback = [this](const TString& error){ SendError(error); };
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));

ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
ScanData = &ComputeCtx.GetTableScan(0);
Expand Down
64 changes: 43 additions & 21 deletions ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ NKikimrConfig::TAppConfig AppCfg() {
return appCfg;
}

NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) {
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold, bool enableSpilling=true) {
NKikimrConfig::TAppConfig appCfg;

auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
Expand All @@ -43,12 +43,32 @@ NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold) {

auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();

spilling->SetEnable(true);
spilling->SetEnable(enableSpilling);
spilling->SetRoot("./spilling/");

return appCfg;
}

void FillTableWithData(NQuery::TQueryClient& db, ui64 numRows=300) {
for (ui32 i = 0; i < numRows; ++i) {
auto result = db.ExecuteQuery(Sprintf(R"(
--!syntax_v1
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}

constexpr auto SimpleGraceJoinWithSpillingQuery = R"(
--!syntax_v1
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
PRAGMA ydb.CostBasedOptimizationLevel='0';
PRAGMA ydb.HashJoinMode='graceandself';
select t1.Key, t1.Value, t2.Key, t2.Value
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
order by t1.Value
)";


} // anonymous namespace

Expand Down Expand Up @@ -79,31 +99,15 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {

auto db = kikimr.GetQueryClient();

for (ui32 i = 0; i < 300; ++i) {
auto result = db.ExecuteQuery(Sprintf(R"(
--!syntax_v1
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

auto query = R"(
--!syntax_v1
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
PRAGMA ydb.CostBasedOptimizationLevel='0';
PRAGMA ydb.HashJoinMode='graceandself';
select t1.Key, t1.Value, t2.Key, t2.Value
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
order by t1.Value
)";
FillTableWithData(db);

auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());

Cerr << planres.GetStats()->GetAst() << Endl;

auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
Expand All @@ -116,6 +120,24 @@ Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
}
}

Y_UNIT_TEST(HandleErrorsCorrectly) {
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
TKikimrRunner kikimr(AppCfgLowComputeLimits(0.01, false));

auto db = kikimr.GetQueryClient();

FillTableWithData(db);

auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
auto planres = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());

Cerr << planres.GetStats()->GetAst() << Endl;

auto result = db.ExecuteQuery(SimpleGraceJoinWithSpillingQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::INTERNAL_ERROR, result.GetIssues().ToString());
}

Y_UNIT_TEST(SelfJoinQueryService) {
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,11 @@ class TKqpTaskRunnerExecutionContext: public NDq::IDqTaskRunnerExecutionContext
return {};
}

std::function<void()> GetWakeupCallback() const override {
NDq::TWakeUpCallback GetWakeupCallback() const override {
return {};
}

NDq::TErrorCallback GetErrorCallback() const override {
return {};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC

Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>);

auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeup));
auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
auto errorCallback = [this](const TString& error){ SendError(error); };
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeupCallback), std::move(errorCallback));

Send(TaskRunnerActorId,
new NTaskRunnerActor::TEvTaskRunnerCreate(
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ class TDqComputeActor : public TDqSyncComputeActorBase<TDqComputeActor> {

auto taskRunner = TaskRunnerFactory(GetAllocatorPtr(), Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup));
auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
auto errorCallback = [this](const TString& error){ SendError(error); };
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeupCallback), std::move(errorCallback));
PrepareTaskRunner(execCtx);

ContinueExecute(EResumeSource::CABootstrap);
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}
}

void SendError(const TString& error) {
this->Send(this->SelfId(), TEvDq::TEvAbortExecution::InternalError(error));
}

protected: //TDqComputeActorChannels::ICallbacks
//i64 GetInputChannelFreeSpace(ui64 channelId) is pure and must be overridded in derived class

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
TaskRunner->Prepare(this->Task, limits, execCtx);

if (this->Task.GetEnableSpilling()) {
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback()));
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
}

for (auto& [channelId, channel] : this->InputChannelsMap) {
Expand Down
15 changes: 10 additions & 5 deletions ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
namespace NYql {
namespace NDq {

TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
: TxId_(txId)
, WakeUp_(std::move(wakeUp))
, WakeUpCallback_(std::move(wakeUpCallback))
, ErrorCallback_(std::move(errorCallback))
{
}

Expand All @@ -18,14 +19,18 @@ IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const {
if (withSpilling) {
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
return CreateDqChannelStorage(TxId_, channelId, WakeUpCallback_, ErrorCallback_, actorSystem);
} else {
return nullptr;
}
}

std::function<void()> TDqTaskRunnerExecutionContext::GetWakeupCallback() const {
return WakeUp_;
TWakeUpCallback TDqTaskRunnerExecutionContext::GetWakeupCallback() const {
return WakeUpCallback_;
}

TErrorCallback TDqTaskRunnerExecutionContext::GetErrorCallback() const {
return ErrorCallback_;
}

TTxId TDqTaskRunnerExecutionContext::GetTxId() const {
Expand Down
8 changes: 5 additions & 3 deletions ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ namespace NDq {

class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
public:
TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp);
TDqTaskRunnerExecutionContext(TTxId txId, TWakeUpCallback&& WakeUpCallback_, TErrorCallback&& ErrorCallback_);

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem) const override;

std::function<void()> GetWakeupCallback() const override;
TWakeUpCallback GetWakeupCallback() const override;
TErrorCallback GetErrorCallback() const override;
TTxId GetTxId() const override;

private:
const TTxId TxId_;
const IDqChannelStorage::TWakeUpCallback WakeUp_;
const TWakeUpCallback WakeUpCallback_;
const TErrorCallback ErrorCallback_;
};

} // namespace NDq
Expand Down
11 changes: 7 additions & 4 deletions ydb/library/yql/dq/actors/spilling/channel_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class TDqChannelStorage : public IDqChannelStorage {
NThreading::TFuture<void> IsBlobWrittenFuture_;
};
public:
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
: ActorSystem_(actorSystem)
{
ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
ChannelStorageActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
ChannelStorageActorId_ = ActorSystem_->Register(ChannelStorageActor_->GetActor());
}

Expand Down Expand Up @@ -119,9 +119,12 @@ class TDqChannelStorage : public IDqChannelStorage {

} // anonymous namespace

IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
TWakeUpCallback wakeUpCallback,
TErrorCallback errorCallback,
TActorSystem* actorSystem)
{
return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
return new TDqChannelStorage(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
}

} // namespace NYql::NDq
4 changes: 3 additions & 1 deletion ydb/library/yql/dq/actors/spilling/channel_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace NActors {
namespace NYql::NDq {

IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);
TWakeUpCallback wakeUpCallback,
TErrorCallback errorCallback,
NActors::TActorSystem* actorSystem);

} // namespace NYql::NDq
27 changes: 16 additions & 11 deletions ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
using TBase = TActorBootstrapped<TDqChannelStorageActor>;
public:

TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
TDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, TActorSystem* actorSystem)
: TxId_(txId)
, ChannelId_(channelId)
, WakeUp_(std::move(wakeUp))
, WakeUpCallback_(std::move(wakeUpCallback))
, ErrorCallback_(std::move(errorCallback))
, ActorSystem_(actorSystem)
{}

Expand All @@ -65,13 +66,12 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,

protected:
void FailWithError(const TString& error) {
if (!ErrorCallback_) Y_ABORT("Error: %s", error.c_str());

LOG_E("Error: " << error);
ErrorCallback_(error);
SendInternal(SpillingActorId_, new TEvents::TEvPoison);
PassAway();

// Currently there is no better way to handle the error.
// Since the message was not sent from the actor system, there is no one to send the error message to.
Y_ABORT("Error: %s", error.c_str());
}

void SendInternal(const TActorId& recipient, IEventBase* ev, TEventFlags flags = IEventHandle::FlagTrackDelivery) {
Expand Down Expand Up @@ -130,7 +130,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
it->second.SetValue();
WritingBlobs_.erase(it);

WakeUp_();
WakeUpCallback_();
}

void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
Expand All @@ -146,7 +146,7 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
it->second.SetValue(std::move(msg.Blob));
LoadingBlobs_.erase(it);

WakeUp_();
WakeUpCallback_();
}

void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
Expand All @@ -163,7 +163,8 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,
private:
const TTxId TxId_;
const ui64 ChannelId_;
IDqChannelStorage::TWakeUpCallback WakeUp_;
TWakeUpCallback WakeUpCallback_;
TErrorCallback ErrorCallback_;
TActorId SpillingActorId_;

// BlobId -> promise that blob is saved
Expand All @@ -177,8 +178,12 @@ class TDqChannelStorageActor : public IDqChannelStorageActor,

} // anonymous namespace

IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) {
return new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId,
TWakeUpCallback&& wakeUpCallback,
TErrorCallback&& errorCallback,
NActors::TActorSystem* actorSystem)
{
return new TDqChannelStorageActor(txId, channelId, std::move(wakeUpCallback), std::move(errorCallback), actorSystem);
}

} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ class IDqChannelStorageActor
virtual NActors::IActor* GetActor() = 0;
};

IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem);
IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback, NActors::TActorSystem* actorSystem);

} // namespace NYql::NDq
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/actors/spilling/compute_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ namespace NYql::NDq {

using namespace NActors;

TDqComputeStorage::TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
TDqComputeStorage::TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
TStringStream spillerName;
spillerName << "Spiller" << "_" << CreateGuidAsString();
ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback);
ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, errorCallback);
ComputeStorageActorId_ = ActorSystem_->Register(ComputeStorageActor_->GetActor());
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/spilling/compute_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller
{
public:

TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, NActors::TActorSystem* actorSystem);
TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, NActors::TActorSystem* actorSystem);

~TDqComputeStorage();

Expand Down
Loading