Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class TResourcesCleanerActor : public TBaseComputeActor<TResourcesCleanerActor>
void Handle(const TEvYdbCompute::TEvForgetOperationResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();
if (response.Status == NYdb::EStatus::TIMEOUT || response.Status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) {
LOG_I("Operation partly forgotten, will be retried: " << response.Status);
SendForgetOperation(TDuration::MilliSeconds(BackoffTimer.NextBackoffMs()));
return;
}
Expand Down
16 changes: 13 additions & 3 deletions ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,14 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
ResignAndPassAway(response.Issues);
return;
}
ExecStatus = response.ExecStatus;
Params.Status = response.ComputeStatus;

if (!OperationIsFailing() || response.ExecStatus != NYdb::NQuery::EExecStatus::Completed) {
ExecStatus = response.ExecStatus;
Params.Status = response.ComputeStatus;
}

LOG_I("StatusTrackerResponse (success) " << response.Status << " ExecStatus: " << static_cast<int>(response.ExecStatus) << " Issues: " << response.Issues.ToOneLineString());
if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
if (ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId, true).release());
} else {
CreateResourcesCleaner();
Expand Down Expand Up @@ -248,6 +252,12 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
return true;
}

bool OperationIsFailing() const {
return Params.Status == FederatedQuery::QueryMeta::FAILING
|| Params.Status == FederatedQuery::QueryMeta::ABORTING_BY_USER
|| Params.Status == FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM;
}

private:
bool IsAborted = false;
bool FinalizationStarted = false;
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ struct TEvSaveScriptResultMetaFinished : public NActors::TEventLocal<TEvSaveScri
NYql::TIssues Issues;
};

struct TEvSaveScriptResultPartFinished : public NActors::TEventLocal<TEvSaveScriptResultPartFinished, TKqpScriptExecutionEvents::EvSaveScriptResultPartFinished> {
TEvSaveScriptResultPartFinished(Ydb::StatusIds::StatusCode status, i64 savedSize, NYql::TIssues issues = {})
: Status(status)
, SavedSize(savedSize)
, Issues(std::move(issues))
{}

Ydb::StatusIds::StatusCode Status;
i64 SavedSize;
NYql::TIssues Issues;
};

struct TEvSaveScriptResultFinished : public NActors::TEventLocal<TEvSaveScriptResultFinished, TKqpScriptExecutionEvents::EvSaveScriptResultFinished> {
TEvSaveScriptResultFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
: Status(status)
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ struct TKqpScriptExecutionEvents {
EvSaveScriptFinalStatusResponse,
EvGetScriptExecutionOperationQueryResponse,
EvDescribeSecretsResponse,
EvSaveScriptResultPartFinished,
};
};

Expand Down
113 changes: 87 additions & 26 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class TScriptExecutionsTablesCreator : public TActorBootstrapped<TScriptExecutio
Col("row_id", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp),
Col("result_set", NScheme::NTypeIds::String),
Col("accumulated_size", NScheme::NTypeIds::Int64),
},
{ "database", "execution_id", "result_set_id", "row_id" },
NKikimrServices::KQP_PROXY,
Expand Down Expand Up @@ -832,9 +833,11 @@ class TCheckLeaseStatusActor : public TCheckLeaseStatusActorBase {

class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 100000;
static constexpr i64 MAX_BATCH_SIZE = 10_MB;

struct TResultSetDescription {
i64 MaxRowId;
i64 MaxAccumulatedSize;
i32 ResultSetId;
};

Expand All @@ -859,7 +862,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;

SELECT result_set_id, MAX(row_id) AS max_row_id
SELECT result_set_id, MAX(row_id) AS max_row_id, MAX(accumulated_size) AS max_accumulated_size
FROM `.metadata/result_sets`
WHERE database = $database AND execution_id = $execution_id
GROUP BY result_set_id;
Expand Down Expand Up @@ -908,17 +911,15 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
return;
}

ResultSetsDescription.emplace_back(TResultSetDescription{*maxRowId, *resultSetId});
i64 maxAccumulatedSize = result.ColumnParser("max_accumulated_size").GetOptionalInt64().GetOrElse(0);

ResultSetsDescription.emplace_back(TResultSetDescription{*maxRowId, maxAccumulatedSize, *resultSetId});
}

DeleteScriptResults();
}

void DeleteScriptResults() {
while (!ResultSetsDescription.empty() && ResultSetsDescription.back().MaxRowId < 0) {
ResultSetsDescription.pop_back();
}

if (ResultSetsDescription.empty()) {
Finish();
return;
Expand All @@ -928,22 +929,32 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
return;
}

TResultSetDescription& resultSet = ResultSetsDescription.back();
resultSet.MaxRowId -= MAX_NUMBER_ROWS_IN_BATCH;
const TResultSetDescription& resultSet = ResultSetsDescription.back();

TString sql = R"(
-- TForgetScriptExecutionOperationQueryActor::DeleteScriptResults
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
DECLARE $result_set_id AS Int32;
DECLARE $max_row_id AS Int64;
DECLARE $max_rows_in_batch AS Int64;
DECLARE $min_accumulated_size AS Int64;

DELETE
FROM `.metadata/result_sets`
WHERE database = $database
AND execution_id = $execution_id
AND result_set_id = $result_set_id
AND row_id > $max_row_id;
AND (row_id = $max_row_id OR (
$max_row_id - row_id < $max_rows_in_batch
AND (accumulated_size IS NULL OR accumulated_size - LEN(result_set) >= $min_accumulated_size)
));

SELECT MAX(row_id) AS max_row_id, MAX(accumulated_size) AS max_accumulated_size
FROM `.metadata/result_sets`
WHERE database = $database
AND execution_id = $execution_id
AND result_set_id = $result_set_id;
)";

NYdb::TParamsBuilder params;
Expand All @@ -959,10 +970,42 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
.Build()
.AddParam("$max_row_id")
.Int64(resultSet.MaxRowId)
.Build()
.AddParam("$max_rows_in_batch")
.Int64(MAX_NUMBER_ROWS_IN_BATCH)
.Build()
.AddParam("$min_accumulated_size")
.Int64(resultSet.MaxAccumulatedSize - MAX_BATCH_SIZE)
.Build();

RunDataQuery(sql, &params);
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::DeleteScriptResults);
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnResultsDeleted);
}

void OnResultsDeleted() {
if (ResultSets.size() != 1) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
return;
}

NYdb::TResultSetParser result(ResultSets[0]);
if (result.RowsCount() != 1) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
return;
}

result.TryNextRow();
TMaybe<i64> maxRowId = result.ColumnParser("max_row_id").GetOptionalInt64();
TMaybe<i64> maxAccumulatedSize = result.ColumnParser("max_accumulated_size").GetOptionalInt64();

if (maxRowId) {
ResultSetsDescription.back().MaxRowId = *maxRowId;
ResultSetsDescription.back().MaxAccumulatedSize = maxAccumulatedSize.GetOrElse(0);
} else {
ResultSetsDescription.pop_back();
}

DeleteScriptResults();
}

void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
Expand Down Expand Up @@ -1723,10 +1766,16 @@ class TSaveScriptExecutionResultMetaQuery : public TQueryBase {

class TSaveScriptExecutionResultQuery : public TQueryBase {
public:
TSaveScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet resultSet)
: Database(database), ExecutionId(executionId), ResultSetId(resultSetId), ExpireAt(expireAt), FirstRow(firstRow), ResultSet(std::move(resultSet))
{
}
TSaveScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId,
TMaybe<TInstant> expireAt, i64 firstRow, i64 accumulatedSize, Ydb::ResultSet resultSet)
: Database(database)
, ExecutionId(executionId)
, ResultSetId(resultSetId)
, ExpireAt(expireAt)
, FirstRow(firstRow)
, AccumulatedSize(accumulatedSize)
, ResultSet(std::move(resultSet))
{}

void OnRunQuery() override {
TString sql = R"(
Expand All @@ -1735,11 +1784,11 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
DECLARE $execution_id AS Text;
DECLARE $result_set_id AS Int32;
DECLARE $expire_at AS Optional<Timestamp>;
DECLARE $items AS List<Struct<row_id:Int64,result_set:String>>;
DECLARE $items AS List<Struct<row_id:Int64,result_set:String,accumulated_size:Int64>>;

UPSERT INTO `.metadata/result_sets`
SELECT $database as database, $execution_id as execution_id, $result_set_id as result_set_id,
T.row_id as row_id, $expire_at as expire_at, T.result_set as result_set
T.row_id as row_id, $expire_at as expire_at, T.result_set as result_set, T.accumulated_size as accumulated_size
FROM AS_TABLE($items) AS T;
)";

Expand All @@ -1765,14 +1814,18 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
.BeginList();

auto row = FirstRow;
for(auto& rowValue : ResultSet.rows()) {
for (const auto& rowValue : ResultSet.rows()) {
auto rowValueSerialized = rowValue.SerializeAsString();
SavedSize += rowValueSerialized.size();
param
.AddListItem()
.BeginStruct()
.AddMember("row_id")
.Int64(row++)
.AddMember("result_set")
.String(rowValue.SerializeAsString())
.String(std::move(rowValueSerialized))
.AddMember("accumulated_size")
.Int64(AccumulatedSize + SavedSize)
.EndStruct();
}
param
Expand All @@ -1788,9 +1841,9 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {

void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
if (status == Ydb::StatusIds::SUCCESS) {
Send(Owner, new TEvSaveScriptResultFinished(status));
Send(Owner, new TEvSaveScriptResultPartFinished(status, SavedSize));
} else {
Send(Owner, new TEvSaveScriptResultFinished(status, std::move(issues)));
Send(Owner, new TEvSaveScriptResultPartFinished(status, SavedSize, std::move(issues)));
}
}

Expand All @@ -1800,7 +1853,9 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
const i32 ResultSetId;
const TMaybe<TInstant> ExpireAt;
const i64 FirstRow;
const i64 AccumulatedSize;
const Ydb::ResultSet ResultSet;
i64 SavedSize = 0;
};

class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExecutionResultActor> {
Expand All @@ -1809,13 +1864,16 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
static constexpr ui64 PROGRAM_BASE_SIZE = 1_MB; // Depends on MAX_NUMBER_ROWS_IN_BATCH

public:
TSaveScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet&& resultSet)
TSaveScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database,
const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt,
i64 firstRow, i64 accumulatedSize, Ydb::ResultSet&& resultSet)
: ReplyActorId(replyActorId)
, Database(database)
, ExecutionId(executionId)
, ResultSetId(resultSetId)
, ExpireAt(expireAt)
, FirstRow(firstRow)
, AccumulatedSize(accumulatedSize)
, RowsSplitter(std::move(resultSet), PROGRAM_SIZE_LIMIT, PROGRAM_BASE_SIZE, MAX_NUMBER_ROWS_IN_BATCH)
{}

Expand All @@ -1826,7 +1884,7 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
}

i64 numberRows = ResultSets.back().rows_size();
Register(new TQueryRetryActor<TSaveScriptExecutionResultQuery, TEvSaveScriptResultFinished, TString, TString, i32, TMaybe<TInstant>, i64, Ydb::ResultSet>(SelfId(), Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, ResultSets.back()));
Register(new TQueryRetryActor<TSaveScriptExecutionResultQuery, TEvSaveScriptResultPartFinished, TString, TString, i32, TMaybe<TInstant>, i64, i64, Ydb::ResultSet>(SelfId(), Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, AccumulatedSize, ResultSets.back()));

FirstRow += numberRows;
ResultSets.pop_back();
Expand All @@ -1847,15 +1905,17 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
}

STRICT_STFUNC(StateFunc,
hFunc(TEvSaveScriptResultFinished, Handle);
hFunc(TEvSaveScriptResultPartFinished, Handle);
)

void Handle(TEvSaveScriptResultFinished::TPtr& ev) {
void Handle(TEvSaveScriptResultPartFinished::TPtr& ev) {
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
Reply(ev->Get()->Status, std::move(ev->Get()->Issues));
return;
}

AccumulatedSize += ev->Get()->SavedSize;

StartSaveResultQuery();
}

Expand All @@ -1871,6 +1931,7 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
const i32 ResultSetId;
const TMaybe<TInstant> ExpireAt;
i64 FirstRow;
i64 AccumulatedSize;
NFq::TRowsProtoSplitter RowsSplitter;
TVector<Ydb::ResultSet> ResultSets;
};
Expand Down Expand Up @@ -2755,8 +2816,8 @@ NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorI
return new TQueryRetryActor<TSaveScriptExecutionResultMetaQuery, TEvSaveScriptResultMetaFinished, TString, TString, TString>(runScriptActorId, database, executionId, serializedMeta);
}

NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet&& resultSet) {
return new TSaveScriptExecutionResultActor(runScriptActorId, database, executionId, resultSetId, expireAt, firstRow, std::move(resultSet));
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, i64 accumulatedSize, Ydb::ResultSet&& resultSet) {
return new TSaveScriptExecutionResultActor(runScriptActorId, database, executionId, resultSetId, expireAt, firstRow, accumulatedSize, std::move(resultSet));
}

NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/proxy_service/kqp_script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId,

// Store and fetch results.
NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, const TString& serializedMeta);
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet&& resultSet);
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, i64 accumulatedSize, Ydb::ResultSet&& resultSet);
NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit);

// Compute external effects and updates status in database
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
struct TPendingSaveResult {
ui32 ResultSetIndex;
ui64 FirstRow;
ui64 AccumulatedSize;
Ydb::ResultSet ResultSet;

TActorId ReplyActorId;
Expand Down Expand Up @@ -272,7 +273,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
}

TPendingSaveResult& result = PendingSaveResults.back();
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, result.ResultSetIndex, ExpireAt, result.FirstRow, std::move(result.ResultSet)));
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, result.ResultSetIndex, ExpireAt, result.FirstRow, result.AccumulatedSize, std::move(result.ResultSet)));
SendStreamDataResponse(result.ReplyActorId, std::move(result.SaveResultResponse));

PendingSaveResults.pop_back();
Expand Down Expand Up @@ -313,6 +314,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
auto& rowCount = ResultSetRowCount[resultSetIndex];
auto& byteCount = ResultSetByteCount[resultSetIndex];
auto firstRow = rowCount;
auto accumulatedSize = byteCount;

Ydb::ResultSet resultSet;
for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
Expand Down Expand Up @@ -365,6 +367,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
PendingSaveResults.push_back({
resultSetIndex,
firstRow,
accumulatedSize,
std::move(resultSet),
ev->Sender,
std::move(resp)
Expand Down
Loading