Skip to content

Commit 10da467

Browse files
committed
Fixed forget operation
1 parent e318487 commit 10da467

File tree

7 files changed

+170
-19
lines changed

7 files changed

+170
-19
lines changed

ydb/core/grpc_services/rpc_forget_operation.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
9797
}
9898

9999
void SendForgetScriptExecutionOperation() {
100-
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
100+
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId, Request->GetDeadline()));
101101
}
102102

103103
public:

ydb/core/kqp/common/events/script_executions.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@ enum EFinalizationStatus : i32 {
2020
};
2121

2222
struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
23-
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
23+
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id, TInstant deadline)
2424
: Database(database)
2525
, OperationId(id)
26+
, Deadline(deadline)
2627
{
2728
}
2829

2930
TString Database;
3031
NOperationId::TOperationId OperationId;
32+
TInstant Deadline;
3133
};
3234

3335
struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> {

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ namespace {
4545
constexpr TDuration LEASE_DURATION = TDuration::Seconds(30);
4646
constexpr TDuration DEADLINE_OFFSET = TDuration::Minutes(20);
4747
constexpr TDuration BRO_RUN_INTERVAL = TDuration::Minutes(60);
48-
constexpr ui64 MAX_NUMBER_ROWS_IN_BATCH = 10000;
4948

5049
TString SerializeIssues(const NYql::TIssues& issues) {
5150
NYql::TIssue root;
@@ -884,18 +883,25 @@ class TCheckLeaseStatusActor : public TCheckLeaseStatusActorBase {
884883
};
885884

886885
class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
886+
static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 10000;
887+
887888
struct TResultSetDescription {
888-
ui64 NumberRows;
889+
i64 MaxRowId;
889890
i32 ResultSetId;
890891
};
891892

892893
public:
893-
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database)
894+
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline)
894895
: ExecutionId(executionId)
895896
, Database(database)
897+
, Deadline(GetDeadline(operationDeadline))
896898
{}
897899

898900
void OnRunQuery() override {
901+
if (!CheckDeadline()) {
902+
return;
903+
}
904+
899905
TString sql = R"(
900906
-- TForgetScriptExecutionOperationQueryActor::OnRunQuery
901907
DECLARE $database AS Text;
@@ -905,7 +911,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
905911
FROM `.metadata/script_executions`
906912
WHERE database = $database AND execution_id = $execution_id;
907913
908-
SELECT result_set_id, COUNT(row_id) AS number_rows
914+
SELECT result_set_id, MAX(row_id) AS max_row_id
909915
FROM `.metadata/result_sets`
910916
WHERE database = $database AND execution_id = $execution_id
911917
GROUP BY result_set_id;
@@ -944,19 +950,24 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
944950
while (result.TryNextRow()) {
945951
TMaybe<i32> resultSetId = result.ColumnParser("result_set_id").GetOptionalInt32();
946952
if (!resultSetId) {
947-
continue;
953+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set id is not specified");
954+
return;
948955
}
949956

950-
ui64 numberRows = result.ColumnParser("number_rows").GetUint64();
957+
TMaybe<i64> maxRowId = result.ColumnParser("max_row_id").GetOptionalInt64();
958+
if (!maxRowId) {
959+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row id is not specified");
960+
return;
961+
}
951962

952-
ResultSetsDescription.emplace_back(TResultSetDescription{numberRows, *resultSetId});
963+
ResultSetsDescription.emplace_back(TResultSetDescription{*maxRowId, *resultSetId});
953964
}
954965

955966
DeleteScriptResults();
956967
}
957968

958969
void DeleteScriptResults() {
959-
while (!ResultSetsDescription.empty() && ResultSetsDescription.back().NumberRows == 0) {
970+
while (!ResultSetsDescription.empty() && ResultSetsDescription.back().MaxRowId < 0) {
960971
ResultSetsDescription.pop_back();
961972
}
962973

@@ -965,22 +976,26 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
965976
return;
966977
}
967978

979+
if (!CheckDeadline()) {
980+
return;
981+
}
982+
968983
TResultSetDescription& resultSet = ResultSetsDescription.back();
969-
resultSet.NumberRows -= std::min(MAX_NUMBER_ROWS_IN_BATCH, resultSet.NumberRows);
984+
resultSet.MaxRowId -= MAX_NUMBER_ROWS_IN_BATCH;
970985

971986
TString sql = R"(
972987
-- TForgetScriptExecutionOperationQueryActor::DeleteScriptResults
973988
DECLARE $database AS Text;
974989
DECLARE $execution_id AS Text;
975990
DECLARE $result_set_id AS Int32;
976-
DECLARE $last_row_id AS Uint64;
991+
DECLARE $max_row_id AS Int64;
977992
978993
DELETE
979994
FROM `.metadata/result_sets`
980995
WHERE database = $database
981996
AND execution_id = $execution_id
982997
AND result_set_id = $result_set_id
983-
AND row_id >= $last_row_id;
998+
AND row_id > $max_row_id;
984999
)";
9851000

9861001
NYdb::TParamsBuilder params;
@@ -994,8 +1009,8 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
9941009
.AddParam("$result_set_id")
9951010
.Int32(resultSet.ResultSetId)
9961011
.Build()
997-
.AddParam("$last_row_id")
998-
.Uint64(resultSet.NumberRows)
1012+
.AddParam("$max_row_id")
1013+
.Int64(resultSet.MaxRowId)
9991014
.Build();
10001015

10011016
RunDataQuery(sql, &params);
@@ -1006,9 +1021,26 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
10061021
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
10071022
}
10081023

1024+
private:
1025+
bool CheckDeadline() {
1026+
if (TInstant::Now() >= Deadline) {
1027+
Finish(Ydb::StatusIds::TIMEOUT, "Forget script execution operation timeout");
1028+
return false;
1029+
}
1030+
return true;
1031+
}
1032+
1033+
static TInstant GetDeadline(TInstant operationDeadline) {
1034+
if (TInstant::Now() >= operationDeadline) {
1035+
return operationDeadline;
1036+
}
1037+
return TInstant::Now() + (operationDeadline - TInstant::Now()) / 2;
1038+
}
1039+
10091040
private:
10101041
TString ExecutionId;
10111042
TString Database;
1043+
TInstant Deadline;
10121044
std::vector<TResultSetDescription> ResultSetsDescription;
10131045
};
10141046

@@ -1049,7 +1081,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
10491081
}
10501082
}
10511083

1052-
Register(new TForgetScriptExecutionOperationQueryActor(ExecutionId, Request->Get()->Database));
1084+
Register(new TForgetScriptExecutionOperationQueryActor(ExecutionId, Request->Get()->Database, Request->Get()->Deadline));
10531085
}
10541086

10551087
void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
@@ -1847,6 +1879,7 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
18471879
};
18481880

18491881
class TSaveScriptExecutionResultActor : public TActorBootstrapped<TSaveScriptExecutionResultActor> {
1882+
static constexpr ui64 MAX_NUMBER_ROWS_IN_BATCH = 10000;
18501883
static constexpr ui64 PROGRAM_SIZE_LIMIT = 10_MB;
18511884
static constexpr ui64 PROGRAM_BASE_SIZE = 1_MB; // Depends on MAX_NUMBER_ROWS_IN_BATCH
18521885

@@ -2069,6 +2102,11 @@ class TGetScriptExecutionResultQuery : public TQueryBase {
20692102
const TMaybe<TString> serializedRow = result.ColumnParser("result_set").GetOptionalString();
20702103

20712104
if (!serializedRow) {
2105+
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is null");
2106+
return;
2107+
}
2108+
2109+
if (serializedRow->Empty()) {
20722110
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is empty");
20732111
return;
20742112
}

ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
155155
}
156156

157157
void Handle(TEvCheckAliveRequest::TPtr& ev) {
158+
LOG_W("Lease was expired in database"
159+
<< ", execution id: " << ExecutionId
160+
<< ", saved final status: " << FinalStatusIsSaved
161+
<< ", wait finalization request: " << WaitFinalizationRequest
162+
<< ", is executing: " << IsExecuting()
163+
<< ", current status: " << Status);
164+
158165
Send(ev->Sender, new TEvCheckAliveResponse());
159166
}
160167

ydb/core/kqp/ut/service/ya.make

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ ELSE()
1313
ENDIF()
1414

1515
SRCS(
16-
kqp_document_api_ut.cpp
17-
kqp_qs_queries_ut.cpp
16+
# kqp_document_api_ut.cpp
17+
# kqp_qs_queries_ut.cpp
1818
kqp_qs_scripts_ut.cpp
19-
kqp_service_ut.cpp
19+
# kqp_service_ut.cpp
2020
)
2121

2222
PEERDIR(

ydb/tests/tools/kqprun/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
sync_dir
2+
*.log
3+
*.sql
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
FeatureFlags {
2+
EnableExternalDataSources: true
3+
EnableScriptExecutionOperations: true
4+
}
5+
6+
KQPConfig {
7+
Settings {
8+
Name: "HashJoinMode"
9+
Value: "grace"
10+
}
11+
Settings {
12+
Name: "_KqpExprNodesAllocationLimit"
13+
Value: "3000000"
14+
}
15+
Settings {
16+
Name: "_KqpExprStringsAllocationLimit"
17+
Value: "100000000"
18+
}
19+
}
20+
21+
LogConfig {
22+
DefaultLevel: 2
23+
}
24+
25+
QueryServiceConfig {
26+
MdbTransformHost: false
27+
ScriptResultRowsLimit: 0
28+
ScriptResultSizeLimit: 10485760
29+
30+
Generic {
31+
MdbGateway: "https://mdb.api.cloud.yandex.net:443"
32+
33+
Connector {
34+
UseSsl: false
35+
36+
Endpoint {
37+
host: "localhost"
38+
port: 50051
39+
}
40+
}
41+
}
42+
43+
HttpGateway {
44+
BuffersSizePerStream: 5000000
45+
ConnectionTimeoutSeconds: 15
46+
LowSpeedBytesLimit: 1024
47+
LowSpeedTimeSeconds: 20
48+
MaxInFlightCount: 2000
49+
MaxSimulatenousDownloadsSize: 2000000000
50+
RequestTimeoutSeconds: 0
51+
}
52+
53+
S3 {
54+
AllowConcurrentListings: true
55+
FileSizeLimit: 100000000000
56+
GeneratorPathsLimit: 50000
57+
ListingCallbackPerThreadQueueSize: 100
58+
ListingCallbackThreadCount: 1
59+
MaxDirectoriesAndFilesPerQuery: 500000
60+
MaxDiscoveryFilesPerQuery: 1000
61+
MaxFilesPerQuery: 500000
62+
MaxInflightListsPerQuery: 100
63+
MaxReadSizePerQuery: 1000000000000
64+
MinDesiredDirectoriesOfFilesPerQuery: 1000
65+
RegexpCacheSize: 100
66+
67+
DefaultSettings {
68+
Name: "UseBlocksSource"
69+
Value: "true"
70+
}
71+
}
72+
}
73+
74+
ResourceBrokerConfig {
75+
Queues {
76+
Name: "queue_kqp_resource_manager"
77+
Weight: 30
78+
79+
Limit {
80+
Memory: 6442450944
81+
}
82+
}
83+
84+
ResourceLimit {
85+
Memory: 6442450944
86+
}
87+
}
88+
89+
TableServiceConfig {
90+
BindingsMode: BM_DROP
91+
CompileTimeoutMs: 600000
92+
SessionsLimitPerNode: 1000
93+
94+
QueryLimits {
95+
DataQueryTimeoutMs: 3600000
96+
}
97+
98+
ResourceManager {
99+
QueryMemoryLimit: 64424509440
100+
}
101+
}

0 commit comments

Comments
 (0)