Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

} catch (const yexception& e) {
InternalError(e.what());
} catch (const TMemoryLimitExceededException&) {
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
}
ReportEventElapsedTime();
}
Expand Down Expand Up @@ -362,7 +364,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
} catch (const yexception& e) {
CancelProposal(0);
InternalError(e.what());
} catch (const TMemoryLimitExceededException& e) {
CancelProposal(0);
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
}

ReportEventElapsedTime();
}

Expand Down Expand Up @@ -944,6 +950,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
} catch (const yexception& e) {
InternalError(e.what());
} catch (const TMemoryLimitExceededException) {
if (ReadOnlyTx) {
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
} else {
RuntimeError(Ydb::StatusIds::UNDETERMINED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
}
}
ReportEventElapsedTime();
}
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
return TActorBootstrapped<TDerived>::SelfId();
}

TString BuildMemoryLimitExceptionMessage() const {
if (Request.TxAlloc) {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
<< ", current limit is " << Request.TxAlloc->Alloc.GetLimit() << " bytes.";
}
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
}

void ReportEventElapsedTime() {
if (Stats) {
ui64 elapsedMicros = TlsActivationContext->GetCurrentEventTicksAsSeconds() * 1'000'000;
Expand Down
18 changes: 0 additions & 18 deletions ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,6 @@ class TKqpLiteralExecuter {
return;
}

ui64 mkqlMemoryLimit = Request.MkqlMemoryLimit > 0
? Request.MkqlMemoryLimit
: 1_GB;

auto& alloc = Request.TxAlloc->Alloc;
auto rmConfig = GetKqpResourceManager()->GetConfig();
ui64 mkqlInitialLimit = std::min(mkqlMemoryLimit, rmConfig.GetMkqlLightProgramMemoryLimit());
ui64 mkqlMaxLimit = std::max(mkqlMemoryLimit, rmConfig.GetMkqlLightProgramMemoryLimit());
alloc.SetLimit(mkqlInitialLimit);

// TODO: KIKIMR-15350
alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) {
if (required < mkqlMaxLimit) {
LOG_D("Increase memory limit from " << currentLimit << " to " << required);
alloc.SetLimit(required);
}
});

// task runner settings
ComputeCtx = std::make_unique<NMiniKQL::TKqpComputeContextBase>();
RunnerContext = CreateTaskRunnerContext(ComputeCtx.get(), &Request.TxAlloc->TypeEnv);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc

} catch (const yexception& e) {
InternalError(e.what());
} catch (const TMemoryLimitExceededException&) {
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
}
ReportEventElapsedTime();
}
Expand Down Expand Up @@ -124,6 +126,8 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
}
} catch (const yexception& e) {
InternalError(e.what());
} catch (const TMemoryLimitExceededException&) {
RuntimeError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(BuildMemoryLimitExceptionMessage())}));
}
ReportEventElapsedTime();
}
Expand Down
53 changes: 49 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <ydb/core/kqp/rm_service/kqp_snapshot_manager.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/public/lib/operation_id/operation_id.h>

#include <ydb/core/util/ulid.h>
Expand Down Expand Up @@ -619,6 +620,23 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxId = UlidGen.Next();
QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false, AppData()->FunctionRegistry,
AppData()->TimeProvider, AppData()->RandomProvider, Config->EnableKqpImmediateEffects);

auto& alloc = QueryState->TxCtx->TxAlloc;
ui64 mkqlInitialLimit = Settings.MkqlInitialMemoryLimit;

const auto& queryLimitsProto = Settings.TableService.GetQueryLimits();
const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits();
ui64 mkqlMaxLimit = phaseLimitsProto.GetComputeNodeMemoryLimitBytes();
mkqlMaxLimit = mkqlMaxLimit ? mkqlMaxLimit : ui64(Settings.MkqlMaxMemoryLimit);

alloc->Alloc.SetLimit(mkqlInitialLimit);
alloc->Alloc.Ref().SetIncreaseMemoryLimitCallback([this, &alloc, mkqlMaxLimit](ui64 currentLimit, ui64 required) {
if (required < mkqlMaxLimit) {
LOG_D("Increase memory limit from " << currentLimit << " to " << required);
alloc->Alloc.SetLimit(required);
}
});

QueryState->QueryData = std::make_shared<TQueryData>(QueryState->TxCtx->TxAlloc);
QueryState->TxCtx->SetIsolationLevel(settings);
QueryState->TxCtx->OnBeginQuery();
Expand Down Expand Up @@ -748,6 +766,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
} catch(const yexception& ex) {
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what();
} catch(const TMemoryLimitExceededException&) {
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << BuildMemoryLimitExceptionMessage();
}
return true;
}
Expand Down Expand Up @@ -2051,6 +2071,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
ReplyQueryError(ex.Status, ex.what(), ex.Issues);
} catch (const yexception& ex) {
InternalError(ex.what());
} catch (const TMemoryLimitExceededException&) {
ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR,
BuildMemoryLimitExceptionMessage());
}
}

Expand Down Expand Up @@ -2090,6 +2113,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
ReplyQueryError(ex.Status, ex.what(), ex.Issues);
} catch (const yexception& ex) {
InternalError(ex.what());
} catch (const TMemoryLimitExceededException&) {
ReplyQueryError(Ydb::StatusIds::UNDETERMINED,
BuildMemoryLimitExceptionMessage());
}
}

Expand Down Expand Up @@ -2125,14 +2151,24 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
} catch (const yexception& ex) {
InternalError(ex.what());
} catch (const TMemoryLimitExceededException&) {
ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR,
BuildMemoryLimitExceptionMessage());
}
}

STATEFN(FinalCleanupState) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvGone, HandleFinalCleanup);
hFunc(TEvents::TEvUndelivered, HandleNoop);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvGone, HandleFinalCleanup);
hFunc(TEvents::TEvUndelivered, HandleNoop);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, Handle);
}
} catch (const yexception& ex) {
InternalError(ex.what());
} catch (const TMemoryLimitExceededException&) {
ReplyQueryError(Ydb::StatusIds::INTERNAL_ERROR,
BuildMemoryLimitExceptionMessage());
}
}

Expand Down Expand Up @@ -2165,6 +2201,15 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
}

TString BuildMemoryLimitExceptionMessage() const {
if (QueryState && QueryState->TxCtx) {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName()
<< ", current limit is " << QueryState->TxCtx->TxAlloc->Alloc.GetLimit() << " bytes.";
} else {
return TStringBuilder() << "Memory limit exception at " << CurrentStateFuncName();
}
}

void ProcessTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
YQL_ENSURE(ev->Get()->Request);
if (ev->Get()->Request->Cookie < QueryId) {
Expand Down
16 changes: 14 additions & 2 deletions ydb/core/kqp/session_actor/kqp_session_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/core/protos/config.pb.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>

#include <ydb/core/control/immediate_control_board_wrapper.h>
#include <ydb/library/actors/core/actorid.h>

namespace NKikimr::NKqp {
Expand All @@ -19,17 +20,28 @@ struct TKqpWorkerSettings {
NKikimrConfig::TTableServiceConfig TableService;
NKikimrConfig::TQueryServiceConfig QueryService;

TControlWrapper MkqlInitialMemoryLimit;
TControlWrapper MkqlMaxMemoryLimit;

TKqpDbCountersPtr DbCounters;

TKqpWorkerSettings(const TString& cluster, const TString& database,
explicit TKqpWorkerSettings(const TString& cluster, const TString& database,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
TKqpDbCountersPtr dbCounters)
: Cluster(cluster)
, Database(database)
, TableService(tableServiceConfig)
, QueryService(queryServiceConfig)
, DbCounters(dbCounters) {}
, MkqlInitialMemoryLimit(2097152, 1, Max<i64>())
, MkqlMaxMemoryLimit(1073741824, 1, Max<i64>())
, DbCounters(dbCounters)
{
AppData()->Icb->RegisterSharedControl(
MkqlInitialMemoryLimit, "KqpSession.MkqlInitialMemoryLimit");
AppData()->Icb->RegisterSharedControl(
MkqlMaxMemoryLimit, "KqpSession.MkqlMaxMemoryLimit");
}
};

IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
Expand Down
81 changes: 81 additions & 0 deletions ydb/core/kqp/ut/query/kqp_limits_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,87 @@ namespace {
}

Y_UNIT_TEST_SUITE(KqpLimits) {
Y_UNIT_TEST(KqpMkqlMemoryLimitException) {
TKikimrRunner kikimr;
CreateLargeTable(kikimr, 10, 10, 1'000'000, 1);

kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR);

TControlWrapper mkqlInitialMemoryLimit;
TControlWrapper mkqlMaxMemoryLimit;

mkqlInitialMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl(
mkqlInitialMemoryLimit, "KqpSession.MkqlInitialMemoryLimit");
mkqlMaxMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl(
mkqlMaxMemoryLimit, "KqpSession.MkqlMaxMemoryLimit");

mkqlInitialMemoryLimit = 1_KB;
mkqlMaxMemoryLimit = 1_KB;

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteDataQuery(Q1_(R"(
SELECT * FROM `/Root/LargeTable`;
)"), TTxControl::BeginTx().CommitTx()).ExtractValueSync();
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
}

Y_UNIT_TEST(LargeParametersAndMkqlFailure) {
auto app = NKikimrConfig::TAppConfig();
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000);

TKikimrRunner kikimr(app);
CreateLargeTable(kikimr, 0, 0, 0);

kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SLOW_LOG, NActors::NLog::PRI_ERROR);

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

TControlWrapper mkqlInitialMemoryLimit;
TControlWrapper mkqlMaxMemoryLimit;

mkqlInitialMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl(
mkqlInitialMemoryLimit, "KqpSession.MkqlInitialMemoryLimit");
mkqlMaxMemoryLimit = kikimr.GetTestServer().GetRuntime()->GetAppData().Icb->RegisterSharedControl(
mkqlMaxMemoryLimit, "KqpSession.MkqlMaxMemoryLimit");


mkqlInitialMemoryLimit = 1_KB;
mkqlMaxMemoryLimit = 1_KB;

auto paramsBuilder = db.GetParamsBuilder();
auto& rowsParam = paramsBuilder.AddParam("$rows");

rowsParam.BeginList();
for (ui32 i = 0; i < 100; ++i) {
rowsParam.AddListItem()
.BeginStruct()
.AddMember("Key")
.OptionalUint64(i)
.AddMember("KeyText")
.OptionalString(TString(5000, '0' + i % 10))
.AddMember("Data")
.OptionalInt64(i)
.AddMember("DataText")
.OptionalString(TString(16, '0' + (i + 1) % 10))
.EndStruct();
}
rowsParam.EndList();
rowsParam.Build();

auto result = session.ExecuteDataQuery(Q1_(R"(
DECLARE $rows AS List<Struct<Key: Uint64?, KeyText: String?, Data: Int64?, DataText: String?>>;

UPSERT INTO `/Root/LargeTable`
SELECT * FROM AS_TABLE($rows);
)"), TTxControl::BeginTx().CommitTx(), paramsBuilder.Build()).ExtractValueSync();
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
}

Y_UNIT_TEST(DatashardProgramSize) {
auto app = NKikimrConfig::TAppConfig();
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(1'000'000'000);
Expand Down