@@ -185,6 +185,7 @@ class TScriptExecutionsTablesCreator : public TActorBootstrapped<TScriptExecutio
185185 Col (" row_id" , NScheme::NTypeIds::Int64),
186186 Col (" expire_at" , NScheme::NTypeIds::Timestamp),
187187 Col (" result_set" , NScheme::NTypeIds::String),
188+ Col (" accumulated_size" , NScheme::NTypeIds::Int64),
188189 },
189190 { " database" , " execution_id" , " result_set_id" , " row_id" },
190191 NKikimrServices::KQP_PROXY,
@@ -832,9 +833,11 @@ class TCheckLeaseStatusActor : public TCheckLeaseStatusActorBase {
832833
833834class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
834835 static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 100000 ;
836+ static constexpr i64 MAX_BATCH_SIZE = 10_MB;
835837
836838 struct TResultSetDescription {
837839 i64 MaxRowId;
840+ i64 MaxAccumulatedSize;
838841 i32 ResultSetId;
839842 };
840843
@@ -859,7 +862,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
859862 FROM `.metadata/script_executions`
860863 WHERE database = $database AND execution_id = $execution_id;
861864
862- SELECT result_set_id, MAX(row_id) AS max_row_id
865+ SELECT result_set_id, MAX(row_id) AS max_row_id, MAX(accumulated_size) AS max_accumulated_size
863866 FROM `.metadata/result_sets`
864867 WHERE database = $database AND execution_id = $execution_id
865868 GROUP BY result_set_id;
@@ -908,17 +911,15 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
908911 return ;
909912 }
910913
911- ResultSetsDescription.emplace_back (TResultSetDescription{*maxRowId, *resultSetId});
914+ i64 maxAccumulatedSize = result.ColumnParser (" max_accumulated_size" ).GetOptionalInt64 ().GetOrElse (0 );
915+
916+ ResultSetsDescription.emplace_back (TResultSetDescription{*maxRowId, maxAccumulatedSize, *resultSetId});
912917 }
913918
914919 DeleteScriptResults ();
915920 }
916921
917922 void DeleteScriptResults () {
918- while (!ResultSetsDescription.empty () && ResultSetsDescription.back ().MaxRowId < 0 ) {
919- ResultSetsDescription.pop_back ();
920- }
921-
922923 if (ResultSetsDescription.empty ()) {
923924 Finish ();
924925 return ;
@@ -928,22 +929,32 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
928929 return ;
929930 }
930931
931- TResultSetDescription& resultSet = ResultSetsDescription.back ();
932- resultSet.MaxRowId -= MAX_NUMBER_ROWS_IN_BATCH;
932+ const TResultSetDescription& resultSet = ResultSetsDescription.back ();
933933
934934 TString sql = R"(
935935 -- TForgetScriptExecutionOperationQueryActor::DeleteScriptResults
936936 DECLARE $database AS Text;
937937 DECLARE $execution_id AS Text;
938938 DECLARE $result_set_id AS Int32;
939939 DECLARE $max_row_id AS Int64;
940+ DECLARE $max_rows_in_batch AS Int64;
941+ DECLARE $min_accumulated_size AS Int64;
940942
941943 DELETE
942944 FROM `.metadata/result_sets`
943945 WHERE database = $database
944946 AND execution_id = $execution_id
945947 AND result_set_id = $result_set_id
946- AND row_id > $max_row_id;
948+ AND (row_id = $max_row_id OR (
949+ $max_row_id - row_id < $max_rows_in_batch
950+ AND (accumulated_size IS NULL OR accumulated_size - LEN(result_set) >= $min_accumulated_size)
951+ ));
952+
953+ SELECT MAX(row_id) AS max_row_id, MAX(accumulated_size) AS max_accumulated_size
954+ FROM `.metadata/result_sets`
955+ WHERE database = $database
956+ AND execution_id = $execution_id
957+ AND result_set_id = $result_set_id;
947958 )" ;
948959
949960 NYdb::TParamsBuilder params;
@@ -959,10 +970,42 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
959970 .Build ()
960971 .AddParam (" $max_row_id" )
961972 .Int64 (resultSet.MaxRowId )
973+ .Build ()
974+ .AddParam (" $max_rows_in_batch" )
975+ .Int64 (MAX_NUMBER_ROWS_IN_BATCH)
976+ .Build ()
977+ .AddParam (" $min_accumulated_size" )
978+ .Int64 (resultSet.MaxAccumulatedSize - MAX_BATCH_SIZE)
962979 .Build ();
963980
964981 RunDataQuery (sql, ¶ms);
965- SetQueryResultHandler (&TForgetScriptExecutionOperationQueryActor::DeleteScriptResults);
982+ SetQueryResultHandler (&TForgetScriptExecutionOperationQueryActor::OnResultsDeleted);
983+ }
984+
985+ void OnResultsDeleted () {
986+ if (ResultSets.size () != 1 ) {
987+ Finish (Ydb::StatusIds::INTERNAL_ERROR, " Unexpected database response" );
988+ return ;
989+ }
990+
991+ NYdb::TResultSetParser result (ResultSets[0 ]);
992+ if (result.RowsCount () != 1 ) {
993+ Finish (Ydb::StatusIds::INTERNAL_ERROR, " Unexpected database response" );
994+ return ;
995+ }
996+
997+ result.TryNextRow ();
998+ TMaybe<i64 > maxRowId = result.ColumnParser (" max_row_id" ).GetOptionalInt64 ();
999+ TMaybe<i64 > maxAccumulatedSize = result.ColumnParser (" max_accumulated_size" ).GetOptionalInt64 ();
1000+
1001+ if (maxRowId) {
1002+ ResultSetsDescription.back ().MaxRowId = *maxRowId;
1003+ ResultSetsDescription.back ().MaxAccumulatedSize = maxAccumulatedSize.GetOrElse (0 );
1004+ } else {
1005+ ResultSetsDescription.pop_back ();
1006+ }
1007+
1008+ DeleteScriptResults ();
9661009 }
9671010
9681011 void OnFinish (Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
@@ -1723,10 +1766,16 @@ class TSaveScriptExecutionResultMetaQuery : public TQueryBase {
17231766
17241767class TSaveScriptExecutionResultQuery : public TQueryBase {
17251768public:
1726- TSaveScriptExecutionResultQuery (const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet resultSet)
1727- : Database(database), ExecutionId(executionId), ResultSetId(resultSetId), ExpireAt(expireAt), FirstRow(firstRow), ResultSet(std::move(resultSet))
1728- {
1729- }
1769+ TSaveScriptExecutionResultQuery (const TString& database, const TString& executionId, i32 resultSetId,
1770+ TMaybe<TInstant> expireAt, i64 firstRow, i64 accumulatedSize, Ydb::ResultSet resultSet)
1771+ : Database(database)
1772+ , ExecutionId(executionId)
1773+ , ResultSetId(resultSetId)
1774+ , ExpireAt(expireAt)
1775+ , FirstRow(firstRow)
1776+ , AccumulatedSize(accumulatedSize)
1777+ , ResultSet(std::move(resultSet))
1778+ {}
17301779
17311780 void OnRunQuery () override {
17321781 TString sql = R"(
@@ -1735,11 +1784,11 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
17351784 DECLARE $execution_id AS Text;
17361785 DECLARE $result_set_id AS Int32;
17371786 DECLARE $expire_at AS Optional<Timestamp>;
1738- DECLARE $items AS List<Struct<row_id:Int64,result_set:String>>;
1787+ DECLARE $items AS List<Struct<row_id:Int64,result_set:String,accumulated_size:Int64 >>;
17391788
17401789 UPSERT INTO `.metadata/result_sets`
17411790 SELECT $database as database, $execution_id as execution_id, $result_set_id as result_set_id,
1742- T.row_id as row_id, $expire_at as expire_at, T.result_set as result_set
1791+ T.row_id as row_id, $expire_at as expire_at, T.result_set as result_set, T.accumulated_size as accumulated_size
17431792 FROM AS_TABLE($items) AS T;
17441793 )" ;
17451794
@@ -1765,14 +1814,18 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
17651814 .BeginList ();
17661815
17671816 auto row = FirstRow;
1768- for (auto & rowValue : ResultSet.rows ()) {
1817+ for (const auto & rowValue : ResultSet.rows ()) {
1818+ auto rowValueSerialized = rowValue.SerializeAsString ();
1819+ SavedSize += rowValueSerialized.size ();
17691820 param
17701821 .AddListItem ()
17711822 .BeginStruct ()
17721823 .AddMember (" row_id" )
17731824 .Int64 (row++)
17741825 .AddMember (" result_set" )
1775- .String (rowValue.SerializeAsString ())
1826+ .String (std::move (rowValueSerialized))
1827+ .AddMember (" accumulated_size" )
1828+ .Int64 (AccumulatedSize + SavedSize)
17761829 .EndStruct ();
17771830 }
17781831 param
@@ -1788,9 +1841,9 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
17881841
17891842 void OnFinish (Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
17901843 if (status == Ydb::StatusIds::SUCCESS) {
1791- Send (Owner, new TEvSaveScriptResultFinished (status));
1844+ Send (Owner, new TEvSaveScriptResultPartFinished (status, SavedSize ));
17921845 } else {
1793- Send (Owner, new TEvSaveScriptResultFinished (status, std::move (issues)));
1846+ Send (Owner, new TEvSaveScriptResultPartFinished (status, SavedSize , std::move (issues)));
17941847 }
17951848 }
17961849
@@ -1800,7 +1853,9 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
18001853 const i32 ResultSetId;
18011854 const TMaybe<TInstant> ExpireAt;
18021855 const i64 FirstRow;
1856+ const i64 AccumulatedSize;
18031857 const Ydb::ResultSet ResultSet;
1858+ i64 SavedSize = 0 ;
18041859};
18051860
18061861class TSaveScriptExecutionResultActor : public TActorBootstrapped <TSaveScriptExecutionResultActor> {
@@ -1809,13 +1864,16 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
18091864 static constexpr ui64 PROGRAM_BASE_SIZE = 1_MB; // Depends on MAX_NUMBER_ROWS_IN_BATCH
18101865
18111866public:
1812- TSaveScriptExecutionResultActor (const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet&& resultSet)
1867+ TSaveScriptExecutionResultActor (const NActors::TActorId& replyActorId, const TString& database,
1868+ const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt,
1869+ i64 firstRow, i64 accumulatedSize, Ydb::ResultSet&& resultSet)
18131870 : ReplyActorId(replyActorId)
18141871 , Database(database)
18151872 , ExecutionId(executionId)
18161873 , ResultSetId(resultSetId)
18171874 , ExpireAt(expireAt)
18181875 , FirstRow(firstRow)
1876+ , AccumulatedSize(accumulatedSize)
18191877 , RowsSplitter(std::move(resultSet), PROGRAM_SIZE_LIMIT, PROGRAM_BASE_SIZE, MAX_NUMBER_ROWS_IN_BATCH)
18201878 {}
18211879
@@ -1826,7 +1884,7 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
18261884 }
18271885
18281886 i64 numberRows = ResultSets.back ().rows_size ();
1829- Register (new TQueryRetryActor<TSaveScriptExecutionResultQuery, TEvSaveScriptResultFinished , TString, TString, i32 , TMaybe<TInstant>, i64 , Ydb::ResultSet>(SelfId (), Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, ResultSets.back ()));
1887+ Register (new TQueryRetryActor<TSaveScriptExecutionResultQuery, TEvSaveScriptResultPartFinished , TString, TString, i32 , TMaybe<TInstant>, i64 , i64 , Ydb::ResultSet>(SelfId (), Database, ExecutionId, ResultSetId, ExpireAt, FirstRow, AccumulatedSize , ResultSets.back ()));
18301888
18311889 FirstRow += numberRows;
18321890 ResultSets.pop_back ();
@@ -1847,15 +1905,17 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
18471905 }
18481906
18491907 STRICT_STFUNC (StateFunc,
1850- hFunc (TEvSaveScriptResultFinished , Handle);
1908+ hFunc (TEvSaveScriptResultPartFinished , Handle);
18511909 )
18521910
1853- void Handle (TEvSaveScriptResultFinished ::TPtr& ev) {
1911+ void Handle (TEvSaveScriptResultPartFinished ::TPtr& ev) {
18541912 if (ev->Get ()->Status != Ydb::StatusIds::SUCCESS) {
18551913 Reply (ev->Get ()->Status , std::move (ev->Get ()->Issues ));
18561914 return ;
18571915 }
18581916
1917+ AccumulatedSize += ev->Get ()->SavedSize ;
1918+
18591919 StartSaveResultQuery ();
18601920 }
18611921
@@ -1871,6 +1931,7 @@ class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExe
18711931 const i32 ResultSetId;
18721932 const TMaybe<TInstant> ExpireAt;
18731933 i64 FirstRow;
1934+ i64 AccumulatedSize;
18741935 NFq::TRowsProtoSplitter RowsSplitter;
18751936 TVector<Ydb::ResultSet> ResultSets;
18761937};
@@ -2755,8 +2816,8 @@ NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorI
27552816 return new TQueryRetryActor<TSaveScriptExecutionResultMetaQuery, TEvSaveScriptResultMetaFinished, TString, TString, TString>(runScriptActorId, database, executionId, serializedMeta);
27562817}
27572818
2758- NActors::IActor* CreateSaveScriptExecutionResultActor (const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, Ydb::ResultSet&& resultSet) {
2759- return new TSaveScriptExecutionResultActor (runScriptActorId, database, executionId, resultSetId, expireAt, firstRow, std::move (resultSet));
2819+ NActors::IActor* CreateSaveScriptExecutionResultActor (const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TMaybe<TInstant> expireAt, i64 firstRow, i64 accumulatedSize, Ydb::ResultSet&& resultSet) {
2820+ return new TSaveScriptExecutionResultActor (runScriptActorId, database, executionId, resultSetId, expireAt, firstRow, accumulatedSize, std::move (resultSet));
27602821}
27612822
27622823NActors::IActor* CreateGetScriptExecutionResultActor (const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit) {
0 commit comments