Skip to content

Commit 0ee1468

Browse files
authored
Merge ba12686 into db5a3b9
2 parents db5a3b9 + ba12686 commit 0ee1468

File tree

14 files changed

+272
-32
lines changed

14 files changed

+272
-32
lines changed

ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class TResourcesCleanerActor : public TBaseComputeActor<TResourcesCleanerActor>
8888
void Handle(const TEvYdbCompute::TEvForgetOperationResponse::TPtr& ev) {
8989
const auto& response = *ev.Get()->Get();
9090
if (response.Status == NYdb::EStatus::TIMEOUT || response.Status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) {
91+
LOG_I("Operation partly forgotten, will be retried: " << response.Status);
9192
SendForgetOperation(TDuration::MilliSeconds(BackoffTimer.NextBackoffMs()));
9293
return;
9394
}

ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,14 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
114114
ResignAndPassAway(response.Issues);
115115
return;
116116
}
117-
ExecStatus = response.ExecStatus;
118-
Params.Status = response.ComputeStatus;
117+
118+
if (!OperationIsFailing() || response.ExecStatus != NYdb::NQuery::EExecStatus::Completed) {
119+
ExecStatus = response.ExecStatus;
120+
Params.Status = response.ComputeStatus;
121+
}
122+
119123
LOG_I("StatusTrackerResponse (success) " << response.Status << " ExecStatus: " << static_cast<int>(response.ExecStatus) << " Issues: " << response.Issues.ToOneLineString());
120-
if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
124+
if (ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
121125
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId, true).release());
122126
} else {
123127
CreateResourcesCleaner();
@@ -248,6 +252,12 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
248252
return true;
249253
}
250254

255+
bool OperationIsFailing() const {
256+
return Params.Status == FederatedQuery::QueryMeta::FAILING
257+
|| Params.Status == FederatedQuery::QueryMeta::ABORTING_BY_USER
258+
|| Params.Status == FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM;
259+
}
260+
251261
private:
252262
bool IsAborted = false;
253263
bool FinalizationStarted = false;

ydb/core/kqp/common/events/script_executions.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,18 @@ struct TEvSaveScriptResultMetaFinished : public NActors::TEventLocal<TEvSaveScri
198198
NYql::TIssues Issues;
199199
};
200200

201+
struct TEvSaveScriptResultPartFinished : public NActors::TEventLocal<TEvSaveScriptResultPartFinished, TKqpScriptExecutionEvents::EvSaveScriptResultPartFinished> {
202+
TEvSaveScriptResultPartFinished(Ydb::StatusIds::StatusCode status, i64 savedSize, NYql::TIssues issues = {})
203+
: Status(status)
204+
, SavedSize(savedSize)
205+
, Issues(std::move(issues))
206+
{}
207+
208+
Ydb::StatusIds::StatusCode Status;
209+
i64 SavedSize;
210+
NYql::TIssues Issues;
211+
};
212+
201213
struct TEvSaveScriptResultFinished : public NActors::TEventLocal<TEvSaveScriptResultFinished, TKqpScriptExecutionEvents::EvSaveScriptResultFinished> {
202214
TEvSaveScriptResultFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
203215
: Status(status)

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ struct TKqpScriptExecutionEvents {
151151
EvSaveScriptFinalStatusResponse,
152152
EvGetScriptExecutionOperationQueryResponse,
153153
EvDescribeSecretsResponse,
154+
EvSaveScriptResultPartFinished,
154155
};
155156
};
156157

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 87 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ class TScriptExecutionsTablesCreator : public TActorBootstrapped<TScriptExecutio
185185
Col("row_id", NScheme::NTypeIds::Int64),
186186
Col("expire_at", NScheme::NTypeIds::Timestamp),
187187
Col("result_set", NScheme::NTypeIds::String),
188+
Col("accumulated_size", NScheme::NTypeIds::Int64),
188189
},
189190
{ "database", "execution_id", "result_set_id", "row_id" },
190191
NKikimrServices::KQP_PROXY,
@@ -832,9 +833,11 @@ class TCheckLeaseStatusActor : public TCheckLeaseStatusActorBase {
832833

833834
class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
834835
static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 100000;
836+
static constexpr i64 MAX_BATCH_SIZE = 10_MB;
835837

836838
struct TResultSetDescription {
837839
i64 MaxRowId;
840+
i64 MaxAccumulatedSize;
838841
i32 ResultSetId;
839842
};
840843

@@ -859,7 +862,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
859862
FROM `.metadata/script_executions`
860863
WHERE database = $database AND execution_id = $execution_id;
861864
862-
SELECT result_set_id, MAX(row_id) AS max_row_id
865+
SELECT result_set_id, MAX(row_id) AS max_row_id, MAX(accumulated_size) AS max_accumulated_size
863866
FROM `.metadata/result_sets`
864867
WHERE database = $database AND execution_id = $execution_id
865868
GROUP BY result_set_id;
@@ -908,17 +911,15 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
908911
return;
909912
}
910913

911-
ResultSetsDescription.emplace_back(TResultSetDescription{*maxRowId, *resultSetId});
914+
i64 maxAccumulatedSize = result.ColumnParser("max_accumulated_size").GetOptionalInt64().GetOrElse(0);
915+
916+
ResultSetsDescription.emplace_back(TResultSetDescription{*maxRowId, maxAccumulatedSize, *resultSetId});
912917
}
913918

914919
DeleteScriptResults();
915920
}
916921

917922
void DeleteScriptResults() {
918-
while (!ResultSetsDescription.empty() && ResultSetsDescription.back().MaxRowId < 0) {
919-
ResultSetsDescription.pop_back();
920-
}
921-
922923
if (ResultSetsDescription.empty()) {
923924
Finish();
924925
return;
@@ -928,22 +929,32 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
928929
return;
929930
}
930931

931-
TResultSetDescription& resultSet = ResultSetsDescription.back();
932-
resultSet.MaxRowId -= MAX_NUMBER_ROWS_IN_BATCH;
932+
const TResultSetDescription& resultSet = ResultSetsDescription.back();
933933

934934
TString sql = R"(
935935
-- TForgetScriptExecutionOperationQueryActor::DeleteScriptResults
936936
DECLARE $database AS Text;
937937
DECLARE $execution_id AS Text;
938938
DECLARE $result_set_id AS Int32;
939939
DECLARE $max_row_id AS Int64;
940+
DECLARE $max_rows_in_batch AS Int64;
941+
DECLARE $min_accumulated_size AS Int64;
940942
941943
DELETE
942944
FROM `.metadata/result_sets`
943945
WHERE database = $database
944946
AND execution_id = $execution_id
945947
AND result_set_id = $result_set_id
946-
AND row_id > $max_row_id;
948+
AND (row_id = $max_row_id OR (
949+
$max_row_id - row_id < $max_rows_in_batch
950+
AND (accumulated_size IS NULL OR accumulated_size - LEN(result_set) >= $min_accumulated_size)
951+
));
952+
953+
SELECT MAX(row_id) AS max_row_id, MAX(accumulated_size) AS max_accumulated_size
954+
FROM `.metadata/result_sets`
955+
WHERE database = $database
956+
AND execution_id = $execution_id
957+
AND result_set_id = $result_set_id;
947958
)";
948959

949960
NYdb::TParamsBuilder params;
@@ -959,10 +970,42 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
959970
.Build()
960971
.AddParam("$max_row_id")
961972
.Int64(resultSet.MaxRowId)
973+
.Build()
974+
.AddParam("$max_rows_in_batch")
975+
.Int64(MAX_NUMBER_ROWS_IN_BATCH)
976+
.Build()
977+
.AddParam("$min_accumulated_size")
978+
.Int64(resultSet.MaxAccumulatedSize - MAX_BATCH_SIZE)
962979
.Build();
963980

964981
RunDataQuery(sql, &params);
965-
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::DeleteScriptResults);
982+
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnResultsDeleted);
983+
}
984+
985+
void OnResultsDeleted() {
986+
if (ResultSets.size() != 1) {
987+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
988+
return;
989+
}
990+
991+
NYdb::TResultSetParser result(ResultSets[0]);
992+
if (result.RowsCount() != 1) {
993+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
994+
return;
995+
}
996+
997+
result.TryNextRow();
998+
TMaybe<i64> maxRowId = result.ColumnParser("max_row_id").GetOptionalInt64();
999+
TMaybe<i64> maxAccumulatedSize = result.ColumnParser("max_accumulated_size").GetOptionalInt64();
1000+
1001+
if (maxRowId) {
1002+
ResultSetsDescription.back().MaxRowId = *maxRowId;
1003+
ResultSetsDescription.back().MaxAccumulatedSize = maxAccumulatedSize.GetOrElse(0);
1004+
} else {
1005+
ResultSetsDescription.pop_back();
1006+
}
1007+
1008+
DeleteScriptResults();
9661009
}
9671010

9681011
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
@@ -1723,10 +1766,16 @@ class TSaveScriptExecutionResultMetaQuery : public TQueryBase {
17231766

17241767
class TSaveScriptExecutionResultQuery : public TQueryBase {
17251768
public:
1726-
TSaveScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet resultSet)
1727-
: Database(database), ExecutionId(executionId), ResultSetId(resultSetId), ExpireAt(expireAt), FirstRow(firstRow), ResultSet(std::move(resultSet))
1728-
{
1729-
}
1769+
TSaveScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetId,
1770+
TMaybe<TInstant> expireAt, i64 firstRow, i64 accumulatedSize, Ydb::ResultSet resultSet)
1771+
: Database(database)
1772+
, ExecutionId(executionId)
1773+
, ResultSetId(resultSetId)
1774+
, ExpireAt(expireAt)
1775+
, FirstRow(firstRow)
1776+
, AccumulatedSize(accumulatedSize)
1777+
, ResultSet(std::move(resultSet))
1778+
{}
17301779

17311780
void OnRunQuery() override {
17321781
TString sql = R"(
@@ -1735,11 +1784,11 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
17351784
DECLARE $execution_id AS Text;
17361785
DECLARE $result_set_id AS Int32;
17371786
DECLARE $expire_at AS Optional<Timestamp>;
1738-
DECLARE $items AS List<Struct<row_id:Int64,result_set:String>>;
1787+
DECLARE $items AS List<Struct<row_id:Int64,result_set:String,accumulated_size:Int64>>;
17391788
17401789
UPSERT INTO `.metadata/result_sets`
17411790
SELECT $database as database, $execution_id as execution_id, $result_set_id as result_set_id,
1742-
T.row_id as row_id, $expire_at as expire_at, T.result_set as result_set
1791+
T.row_id as row_id, $expire_at as expire_at, T.result_set as result_set, T.accumulated_size as accumulated_size
17431792
FROM AS_TABLE($items) AS T;
17441793
)";
17451794

@@ -1765,14 +1814,18 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
17651814
.BeginList();
17661815

17671816
auto row = FirstRow;
1768-
for(auto& rowValue : ResultSet.rows()) {
1817+
for (const auto& rowValue : ResultSet.rows()) {
1818+
auto rowValueSerialized = rowValue.SerializeAsString();
1819+
SavedSize += rowValueSerialized.size();
17691820
param
17701821
.AddListItem()
17711822
.BeginStruct()
17721823
.AddMember("row_id")
17731824
.Int64(row++)
17741825
.AddMember("result_set")
1775-
.String(rowValue.SerializeAsString())
1826+
.String(std::move(rowValueSerialized))
1827+
.AddMember("accumulated_size")
1828+
.Int64(AccumulatedSize + SavedSize)
17761829
.EndStruct();
17771830
}
17781831
param
@@ -1788,9 +1841,9 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
17881841

17891842
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
17901843
if (status == Ydb::StatusIds::SUCCESS) {
1791-
Send(Owner, new TEvSaveScriptResultFinished(status));
1844+
Send(Owner, new TEvSaveScriptResultPartFinished(status, SavedSize));
17921845
} else {
1793-
Send(Owner, new TEvSaveScriptResultFinished(status, std::move(issues)));
1846+
Send(Owner, new TEvSaveScriptResultPartFinished(status, SavedSize, std::move(issues)));
17941847
}
17951848
}
17961849

@@ -1800,7 +1853,9 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
18001853
const i32 ResultSetId;
18011854
const TMaybe<TInstant> ExpireAt;
18021855
const i64 FirstRow;
1856+
const i64 AccumulatedSize;
18031857
const Ydb::ResultSet ResultSet;
1858+
i64 SavedSize = 0;
18041859
};
18051860

18061861
class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExecutionResultActor> {
@@ -1809,13 +1864,16 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
18091864
static constexpr ui64 PROGRAM_BASE_SIZE = 1_MB; // Depends on MAX_NUMBER_ROWS_IN_BATCH
18101865

18111866
public:
1812-
TSaveScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet&& resultSet)
1867+
TSaveScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database,
1868+
const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt,
1869+
i64 firstRow, i64 accumulatedSize, Ydb::ResultSet&& resultSet)
18131870
: ReplyActorId(replyActorId)
18141871
, Database(database)
18151872
, ExecutionId(executionId)
18161873
, ResultSetId(resultSetId)
18171874
, ExpireAt(expireAt)
18181875
, FirstRow(firstRow)
1876+
, AccumulatedSize(accumulatedSize)
18191877
, RowsSplitter(std::move(resultSet), PROGRAM_SIZE_LIMIT, PROGRAM_BASE_SIZE, MAX_NUMBER_ROWS_IN_BATCH)
18201878
{}
18211879

@@ -1826,7 +1884,7 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
18261884
}
18271885

18281886
i64 numberRows = ResultSets.back().rows_size();
1829-
Register(new TQueryRetryActor<TSaveScriptExecutionResultQuery, TEvSaveScriptResultFinished, TString, TString, i32, TMaybe<TInstant>, i64, Ydb::ResultSet>(SelfId(), Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, ResultSets.back()));
1887+
Register(new TQueryRetryActor<TSaveScriptExecutionResultQuery, TEvSaveScriptResultPartFinished, TString, TString, i32, TMaybe<TInstant>, i64, i64, Ydb::ResultSet>(SelfId(), Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, AccumulatedSize, ResultSets.back()));
18301888

18311889
FirstRow += numberRows;
18321890
ResultSets.pop_back();
@@ -1847,15 +1905,17 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
18471905
}
18481906

18491907
STRICT_STFUNC(StateFunc,
1850-
hFunc(TEvSaveScriptResultFinished, Handle);
1908+
hFunc(TEvSaveScriptResultPartFinished, Handle);
18511909
)
18521910

1853-
void Handle(TEvSaveScriptResultFinished::TPtr& ev) {
1911+
void Handle(TEvSaveScriptResultPartFinished::TPtr& ev) {
18541912
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
18551913
Reply(ev->Get()->Status, std::move(ev->Get()->Issues));
18561914
return;
18571915
}
18581916

1917+
AccumulatedSize += ev->Get()->SavedSize;
1918+
18591919
StartSaveResultQuery();
18601920
}
18611921

@@ -1871,6 +1931,7 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
18711931
const i32 ResultSetId;
18721932
const TMaybe<TInstant> ExpireAt;
18731933
i64 FirstRow;
1934+
i64 AccumulatedSize;
18741935
NFq::TRowsProtoSplitter RowsSplitter;
18751936
TVector<Ydb::ResultSet> ResultSets;
18761937
};
@@ -2755,8 +2816,8 @@ NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorI
27552816
return new TQueryRetryActor<TSaveScriptExecutionResultMetaQuery, TEvSaveScriptResultMetaFinished, TString, TString, TString>(runScriptActorId, database, executionId, serializedMeta);
27562817
}
27572818

2758-
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet&& resultSet) {
2759-
return new TSaveScriptExecutionResultActor(runScriptActorId, database, executionId, resultSetId, expireAt, firstRow, std::move(resultSet));
2819+
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, i64 accumulatedSize, Ydb::ResultSet&& resultSet) {
2820+
return new TSaveScriptExecutionResultActor(runScriptActorId, database, executionId, resultSetId, expireAt, firstRow, accumulatedSize, std::move(resultSet));
27602821
}
27612822

27622823
NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit) {

ydb/core/kqp/proxy_service/kqp_script_executions.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId,
2828

2929
// Store and fetch results.
3030
NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, const TString& serializedMeta);
31-
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet&& resultSet);
31+
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, i64 accumulatedSize, Ydb::ResultSet&& resultSet);
3232
NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit);
3333

3434
// Compute external effects and updates status in database

ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
5151
struct TPendingSaveResult {
5252
ui32 ResultSetIndex;
5353
ui64 FirstRow;
54+
ui64 AccumulatedSize;
5455
Ydb::ResultSet ResultSet;
5556

5657
TActorId ReplyActorId;
@@ -272,7 +273,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
272273
}
273274

274275
TPendingSaveResult& result = PendingSaveResults.back();
275-
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, result.ResultSetIndex, ExpireAt, result.FirstRow, std::move(result.ResultSet)));
276+
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, result.ResultSetIndex, ExpireAt, result.FirstRow, result.AccumulatedSize, std::move(result.ResultSet)));
276277
SendStreamDataResponse(result.ReplyActorId, std::move(result.SaveResultResponse));
277278

278279
PendingSaveResults.pop_back();
@@ -313,6 +314,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
313314
auto& rowCount = ResultSetRowCount[resultSetIndex];
314315
auto& byteCount = ResultSetByteCount[resultSetIndex];
315316
auto firstRow = rowCount;
317+
auto accumulatedSize = byteCount;
316318

317319
Ydb::ResultSet resultSet;
318320
for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
@@ -365,6 +367,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
365367
PendingSaveResults.push_back({
366368
resultSetIndex,
367369
firstRow,
370+
accumulatedSize,
368371
std::move(resultSet),
369372
ev->Sender,
370373
std::move(resp)

0 commit comments

Comments
 (0)